From ea2914d835c7590196b491a32bed324d4c74b95f Mon Sep 17 00:00:00 2001 From: Jacob Greenfield Date: Wed, 16 Oct 2024 16:40:09 -0400 Subject: [PATCH 1/4] Return `Some` for only one thread when calling `remove()` --- crossbeam-skiplist/src/base.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crossbeam-skiplist/src/base.rs b/crossbeam-skiplist/src/base.rs index f3c90b1f9..af68123dd 100644 --- a/crossbeam-skiplist/src/base.rs +++ b/crossbeam-skiplist/src/base.rs @@ -1157,8 +1157,12 @@ where break; } } + return Some(entry); + } else { + // The node has already been marked. + n.decrement(guard); + return None; } - return Some(entry); } } } From 769c520f16fc62a25fe6ea0e8c2abde8cd0c4f65 Mon Sep 17 00:00:00 2001 From: Jacob Greenfield Date: Thu, 19 Dec 2024 19:40:39 -0500 Subject: [PATCH 2/4] Add test for remove() race condition --- crossbeam-skiplist/tests/base.rs | 50 ++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/crossbeam-skiplist/tests/base.rs b/crossbeam-skiplist/tests/base.rs index 83d2f221e..14e41a805 100644 --- a/crossbeam-skiplist/tests/base.rs +++ b/crossbeam-skiplist/tests/base.rs @@ -900,3 +900,53 @@ fn drops() { assert_eq!(KEYS.load(Ordering::SeqCst), 8); assert_eq!(VALUES.load(Ordering::SeqCst), 7); } + +// https://github.com/crossbeam-rs/crossbeam/pull/1143 +#[cfg(target_has_atomic = "32")] +#[test] +fn remove_race() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let nthreads = 16; + let key_range = 100_000u32; + + let guard = &epoch::pin(); + let s = SkipList::new(epoch::default_collector().clone()); + + for x in 0..key_range { + s.insert(x, (), guard); + } + + let barrier1 = AtomicU32::new(nthreads); + let barrier2 = AtomicU32::new(nthreads); + let mut total_removed = AtomicU32::new(0); + + std::thread::scope(|scope| { + for _ in 0..nthreads { + scope.spawn(|| { + let guard = &epoch::pin(); + let mut removed_entries = Vec::with_capacity(key_range as usize); + + barrier1.fetch_sub(1, Ordering::Relaxed); + while barrier1.load(Ordering::Acquire) != 0 {} + + for x in 0..key_range { + if let Some(entry) = s.remove(&x, guard) { + removed_entries.push(entry); + } + } + + barrier2.fetch_sub(1, Ordering::Relaxed); + while barrier2.load(Ordering::Acquire) != 0 {} + + total_removed.fetch_add(removed_entries.len() as u32, Ordering::Relaxed); + + for entry in removed_entries.drain(..) { + entry.release(guard); + } + }); + } + }); + + assert_eq!(*total_removed.get_mut(), key_range); +} From ea0fbbd66fd083679e7ca7bb0285715af1e2084a Mon Sep 17 00:00:00 2001 From: Jacob Greenfield Date: Thu, 19 Dec 2024 20:27:53 -0500 Subject: [PATCH 3/4] Smaller key range for miri --- crossbeam-skiplist/tests/base.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crossbeam-skiplist/tests/base.rs b/crossbeam-skiplist/tests/base.rs index 1d4043f28..8abcc29e9 100644 --- a/crossbeam-skiplist/tests/base.rs +++ b/crossbeam-skiplist/tests/base.rs @@ -978,6 +978,9 @@ fn remove_race() { use std::sync::atomic::{AtomicU32, Ordering}; let nthreads = 16; + #[cfg(miri)] + let key_range = 100u32; + #[cfg(not(miri))] let key_range = 100_000u32; let guard = &epoch::pin(); From f9369988bc1fe1951a1162e180e84c94c66b1cb9 Mon Sep 17 00:00:00 2001 From: Jacob Greenfield Date: Thu, 19 Dec 2024 20:35:57 -0500 Subject: [PATCH 4/4] Fix memory leak --- crossbeam-skiplist/tests/base.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crossbeam-skiplist/tests/base.rs b/crossbeam-skiplist/tests/base.rs index 8abcc29e9..d12b7fde7 100644 --- a/crossbeam-skiplist/tests/base.rs +++ b/crossbeam-skiplist/tests/base.rs @@ -987,7 +987,7 @@ fn remove_race() { let s = SkipList::new(epoch::default_collector().clone()); for x in 0..key_range { - s.insert(x, (), guard); + s.insert(x, (), guard).release(guard); } let barrier1 = AtomicU32::new(nthreads);