diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index 492e21d9bdb63..707276e7407bc 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -86,6 +86,18 @@ pub(crate) struct Channel { receivers: SyncWaker, } +/// The state of the channel after calling `start_recv` or `start_send`. +#[derive(PartialEq, Eq)] +enum Status { + /// The channel is ready to read or write to. + Ready, + /// There is currently a send or receive in progress holding up the queue. + /// All operations must block to preserve linearizability. + InProgress, + /// The channel is empty. + Empty, +} + impl Channel { /// Creates a bounded channel of capacity `cap`. pub(crate) fn with_capacity(cap: usize) -> Self { @@ -122,7 +134,7 @@ impl Channel { } /// Attempts to reserve a slot for sending a message. - fn start_send(&self, token: &mut Token) -> bool { + fn start_send(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -131,7 +143,7 @@ impl Channel { if tail & self.mark_bit != 0 { token.array.slot = ptr::null(); token.array.stamp = 0; - return true; + return Status::Ready; } // Deconstruct the tail. @@ -166,7 +178,7 @@ impl Channel { // Prepare the token for the follow-up call to `write`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = tail + 1; - return true; + return Status::Ready; } Err(_) => { backoff.spin_light(); @@ -180,10 +192,16 @@ impl Channel { // If the head lags one lap behind the tail as well... if head.wrapping_add(self.one_lap) == tail { // ...then the channel is full. - return false; + return Status::Empty; + } + + // The head was advanced but the stamp hasn't been updated yet, + // meaning a receive is in-progress. Spin for a bit waiting for + // the receive to complete before falling back to blocking. + if !backoff.try_spin_light() { + return Status::InProgress; } - backoff.spin_light(); tail = self.tail.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. @@ -200,10 +218,10 @@ impl Channel { return Err(msg); } - let slot: &Slot = &*(token.array.slot as *const Slot); + let slot: &Slot = unsafe { &*token.array.slot.cast::>() }; // Write the message into the slot and update the stamp. - slot.msg.get().write(MaybeUninit::new(msg)); + unsafe { slot.msg.get().write(MaybeUninit::new(msg)) } slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping receiver. @@ -212,7 +230,7 @@ impl Channel { } /// Attempts to reserve a slot for receiving a message. - fn start_recv(&self, token: &mut Token) -> bool { + fn start_recv(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); @@ -249,7 +267,7 @@ impl Channel { // Prepare the token for the follow-up call to `read`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = head.wrapping_add(self.one_lap); - return true; + return Status::Ready; } Err(_) => { backoff.spin_light(); @@ -267,14 +285,20 @@ impl Channel { // ...then receive an error. token.array.slot = ptr::null(); token.array.stamp = 0; - return true; + return Status::Ready; } else { // Otherwise, the receive operation is not ready. - return false; + return Status::Empty; } } - backoff.spin_light(); + // The tail was advanced but the stamp hasn't been updated yet, + // meaning a send is in-progress. Spin for a bit waiting for + // the send to complete before falling back to blocking. + if !backoff.try_spin_light() { + return Status::InProgress; + } + head = self.head.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. @@ -291,10 +315,10 @@ impl Channel { return Err(()); } - let slot: &Slot = &*(token.array.slot as *const Slot); + let slot: &Slot = unsafe { &*token.array.slot.cast::>() }; // Read the message from the slot and update the stamp. - let msg = slot.msg.get().read().assume_init(); + let msg = unsafe { slot.msg.get().read().assume_init() }; slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping sender. @@ -304,11 +328,13 @@ impl Channel { /// Attempts to send a message into the channel. pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { - let token = &mut Token::default(); - if self.start_send(token) { - unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } - } else { - Err(TrySendError::Full(msg)) + match self.send_blocking(msg, None, false) { + Ok(None) => Ok(()), + Ok(Some(msg)) => Err(TrySendError::Full(msg)), + Err(SendTimeoutError::Disconnected(msg)) => Err(TrySendError::Disconnected(msg)), + Err(SendTimeoutError::Timeout(_)) => { + unreachable!("called recv_blocking with deadline: None") + } } } @@ -318,12 +344,43 @@ impl Channel { msg: T, deadline: Option, ) -> Result<(), SendTimeoutError> { + self.send_blocking(msg, deadline, true) + .map(|value| assert!(value.is_none(), "called send_blocking with block: true")) + } + + /// Sends a message into the channel. + /// + /// Blocks until a message is sent if `should_block` is `true`. Otherwise, returns `Ok(Some(msg))` if + /// the channel is full. + /// + /// Note this method may still block when `should_block` is `false` if the channel is in an inconsistent state. + pub(crate) fn send_blocking( + &self, + msg: T, + deadline: Option, + should_block: bool, + ) -> Result, SendTimeoutError> { let token = &mut Token::default(); + let mut state = self.senders.start(); loop { - // Try sending a message. - if self.start_send(token) { - let res = unsafe { self.write(token, msg) }; - return res.map_err(SendTimeoutError::Disconnected); + // Try sending a message several times. + let backoff = Backoff::new(); + loop { + match self.start_send(token) { + Status::Ready => { + let res = unsafe { self.write(token, msg) }; + return res.map(|_| None).map_err(SendTimeoutError::Disconnected); + } + // If the channel is full, return or block immediately. + Status::Empty if !should_block => return Ok(Some(msg)), + Status::Empty => break, + // Otherwise spin for a bit before blocking. + Status::InProgress => {} + } + + if !backoff.try_spin_light() { + break; + } } if let Some(d) = deadline { @@ -335,7 +392,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - self.senders.register(oper, cx); + self.senders.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_full() || self.is_disconnected() { @@ -353,28 +410,61 @@ impl Channel { Selected::Operation(_) => {} } }); + + state.unpark(); } } /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { - let token = &mut Token::default(); - - if self.start_recv(token) { - unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } - } else { - Err(TryRecvError::Empty) + match self.recv_blocking(None, false) { + Ok(Some(value)) => Ok(value), + Ok(None) => Err(TryRecvError::Empty), + Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected), + Err(RecvTimeoutError::Timeout) => { + unreachable!("called recv_blocking with deadline: None") + } } } /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { + self.recv_blocking(deadline, true) + .map(|value| value.expect("called recv_blocking with block: true")) + } + + /// Receives a message from the channel. + /// + /// Blocks until a message is received if `should_block` is `true`. Otherwise, returns `Ok(None)` if + /// the channel is full. + /// + /// Note this may still block when `should_block` is `false` if the channel is in an inconsistent state. + pub(crate) fn recv_blocking( + &self, + deadline: Option, + should_block: bool, + ) -> Result, RecvTimeoutError> { let token = &mut Token::default(); + let mut state = self.receivers.start(); loop { - // Try receiving a message. - if self.start_recv(token) { - let res = unsafe { self.read(token) }; - return res.map_err(|_| RecvTimeoutError::Disconnected); + // Try receiving a message several times. + let backoff = Backoff::new(); + loop { + match self.start_recv(token) { + Status::Ready => { + let res = unsafe { self.read(token) }; + return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected); + } + // If the channel is empty, return or block immediately. + Status::Empty if !should_block => return Ok(None), + Status::Empty => break, + // Otherwise spin for a bit before blocking. + Status::InProgress => {} + } + + if !backoff.try_spin_light() { + break; + } } if let Some(d) = deadline { @@ -386,7 +476,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - self.receivers.register(oper, cx); + self.receivers.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -406,6 +496,8 @@ impl Channel { Selected::Operation(_) => {} } }); + + state.unpark(); } } diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs index 9e7148c716cda..b5d04d486d18b 100644 --- a/library/std/src/sync/mpmc/list.rs +++ b/library/std/src/sync/mpmc/list.rs @@ -9,7 +9,7 @@ use super::waker::SyncWaker; use crate::cell::UnsafeCell; use crate::marker::PhantomData; use crate::mem::MaybeUninit; -use crate::ptr; +use crate::ptr::{self, NonNull}; use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use crate::time::Instant; @@ -42,12 +42,14 @@ struct Slot { } impl Slot { - /// Waits until a message is written into the slot. - fn wait_write(&self) { - let backoff = Backoff::new(); - while self.state.load(Ordering::Acquire) & WRITE == 0 { - backoff.spin_heavy(); - } + const UNINIT: Self = + Self { msg: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0) }; + + /// Blocks until a message is written into the slot. + fn wait_write(&self, receivers: &SyncWaker, token: &mut Token) { + watch_until(receivers, token, || { + (self.state.load(Ordering::SeqCst) & WRITE != 0).then_some(()) + }) } } @@ -65,25 +67,12 @@ struct Block { impl Block { /// Creates an empty block. fn new() -> Block { - // SAFETY: This is safe because: - // [1] `Block::next` (AtomicPtr) may be safely zero initialized. - // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. - // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it - // holds a MaybeUninit. - // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. - unsafe { MaybeUninit::zeroed().assume_init() } + Block { next: AtomicPtr::new(ptr::null_mut()), slots: [Slot::UNINIT; BLOCK_CAP] } } - /// Waits until the next pointer is set. - fn wait_next(&self) -> *mut Block { - let backoff = Backoff::new(); - loop { - let next = self.next.load(Ordering::Acquire); - if !next.is_null() { - return next; - } - backoff.spin_heavy(); - } + /// Blocks until the next pointer is set. + fn wait_next(&self, receivers: &SyncWaker, token: &mut Token) -> *mut Self { + watch_until(receivers, token, || NonNull::new(self.next.load(Ordering::SeqCst))).as_ptr() } /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. @@ -91,7 +80,7 @@ impl Block { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. for i in start..BLOCK_CAP - 1 { - let slot = (*this).slots.get_unchecked(i); + let slot = unsafe { (*this).slots.get_unchecked(i) }; // Mark the `DESTROY` bit if a thread is still using the slot. if slot.state.load(Ordering::Acquire) & READ == 0 @@ -103,7 +92,7 @@ impl Block { } // No thread is using the block, now it is safe to destroy it. - drop(Box::from_raw(this)); + drop(unsafe { Box::from_raw(this) }); } } @@ -155,6 +144,18 @@ pub(crate) struct Channel { _marker: PhantomData, } +/// The status of the channel after calling `start_recv`. +#[derive(PartialEq, Eq)] +enum Status { + /// The channel has a message ready to read. + Ready, + /// There is currently a send in progress holding up the queue. + /// All operations must block to preserve linearizability. + InProgress, + /// The channel is empty. + Empty, +} + impl Channel { /// Creates a new unbounded channel. pub(crate) fn new() -> Self { @@ -173,7 +174,7 @@ impl Channel { } /// Attempts to reserve a slot for sending a message. - fn start_send(&self, token: &mut Token) -> bool { + fn start_send(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); let mut block = self.tail.block.load(Ordering::Acquire); @@ -183,15 +184,19 @@ impl Channel { // Check if the channel is disconnected. if tail & MARK_BIT != 0 { token.list.block = ptr::null(); - return true; + return Status::Ready; } // Calculate the offset of the index into the block. let offset = (tail >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. + // If we've been waiting for too long, fallback to blocking. if offset == BLOCK_CAP { - backoff.spin_heavy(); + if !backoff.try_spin_light() { + return Status::InProgress; + } + tail = self.tail.index.load(Ordering::Acquire); block = self.tail.block.load(Ordering::Acquire); continue; @@ -239,12 +244,12 @@ impl Channel { let next_block = Box::into_raw(next_block.unwrap()); self.tail.block.store(next_block, Ordering::Release); self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); - (*block).next.store(next_block, Ordering::Release); + (*block).next.store(next_block, Ordering::SeqCst); } token.list.block = block as *const u8; token.list.offset = offset; - return true; + return Status::Ready; }, Err(_) => { backoff.spin_light(); @@ -263,11 +268,11 @@ impl Channel { } // Write the message into the slot. - let block = token.list.block as *mut Block; + let block = token.list.block.cast::>(); let offset = token.list.offset; - let slot = (*block).slots.get_unchecked(offset); - slot.msg.get().write(MaybeUninit::new(msg)); - slot.state.fetch_or(WRITE, Ordering::Release); + let slot = unsafe { (*block).slots.get_unchecked(offset) }; + unsafe { slot.msg.get().write(MaybeUninit::new(msg)) } + slot.state.fetch_or(WRITE, Ordering::SeqCst); // Wake a sleeping receiver. self.receivers.notify(); @@ -275,7 +280,7 @@ impl Channel { } /// Attempts to reserve a slot for receiving a message. - fn start_recv(&self, token: &mut Token) -> bool { + fn start_recv(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut head = self.head.index.load(Ordering::Acquire); let mut block = self.head.block.load(Ordering::Acquire); @@ -284,9 +289,14 @@ impl Channel { // Calculate the offset of the index into the block. let offset = (head >> SHIFT) % LAP; - // If we reached the end of the block, wait until the next one is installed. + // We reached the end of the block but the block is not installed yet, meaning + // the last send on the previous block is still in progress. The send is likely to + // complete soon so we spin here before falling back to blocking. if offset == BLOCK_CAP { - backoff.spin_heavy(); + if !backoff.try_spin_light() { + return Status::InProgress; + } + head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); continue; @@ -304,10 +314,10 @@ impl Channel { if tail & MARK_BIT != 0 { // ...then receive an error. token.list.block = ptr::null(); - return true; + return Status::Ready; } else { // Otherwise, the receive operation is not ready. - return false; + return Status::Empty; } } @@ -317,10 +327,14 @@ impl Channel { } } - // The block can be null here only if the first message is being sent into the channel. - // In that case, just wait until it gets initialized. + // The block can be null here only if the first message sent into the channel is + // in progress. The send is likely to complete soon so we spin here before falling + // back to blocking. if block.is_null() { - backoff.spin_heavy(); + if !backoff.try_spin_light() { + return Status::InProgress; + } + head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); continue; @@ -336,7 +350,7 @@ impl Channel { Ok(_) => unsafe { // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { - let next = (*block).wait_next(); + let next = (*block).wait_next(&self.receivers, token); let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= MARK_BIT; @@ -348,7 +362,7 @@ impl Channel { token.list.block = block as *const u8; token.list.offset = offset; - return true; + return Status::Ready; }, Err(_) => { backoff.spin_light(); @@ -369,16 +383,18 @@ impl Channel { // Read the message. let block = token.list.block as *mut Block; let offset = token.list.offset; - let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let msg = slot.msg.get().read().assume_init(); + let slot = unsafe { (*block).slots.get_unchecked(offset) }; + slot.wait_write(&self.receivers, token); + let msg = unsafe { slot.msg.get().read().assume_init() }; // Destroy the block if we've reached the end, or if another thread wanted to destroy but // couldn't because we were busy reading from the slot. - if offset + 1 == BLOCK_CAP { - Block::destroy(block, 0); - } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { - Block::destroy(block, offset + 1); + unsafe { + if offset + 1 == BLOCK_CAP { + Block::destroy(block, 0); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset + 1); + } } Ok(msg) @@ -399,28 +415,106 @@ impl Channel { _deadline: Option, ) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); - assert!(self.start_send(token)); - unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) } + + // It's possible that we can't proceed because of the sender that + // is supposed to install the next block lagging, so we might have to + // block for a message to be sent. + let mut state = self.receivers.start(); + let mut started = false; + loop { + // Try sending a message several times. + let backoff = Backoff::new(); + loop { + if started || self.start_send(token) == Status::Ready { + return unsafe { + self.write(token, msg).map_err(SendTimeoutError::Disconnected) + }; + } + + // Spin for a bit before blocking. + if !backoff.try_spin_light() { + break; + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + // Register to be notified after any message is sent. + let oper = Operation::hook(token); + self.receivers.watch(oper, cx, &state); + + // Has the channel become ready just now? + if self.start_send(token) == Status::Ready { + let _ = cx.try_select(Selected::Aborted); + started = true; + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unwatch(oper); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); + } } /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { - let token = &mut Token::default(); - - if self.start_recv(token) { - unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } - } else { - Err(TryRecvError::Empty) + match self.recv_blocking(None, false) { + Ok(Some(value)) => Ok(value), + Ok(None) => Err(TryRecvError::Empty), + Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected), + Err(RecvTimeoutError::Timeout) => { + unreachable!("called recv_blocking with deadline: None") + } } } /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { + self.recv_blocking(deadline, true) + .map(|value| value.expect("called recv_blocking with block: true")) + } + + /// Receives a message from the channel. + /// + /// Blocks until a message is received if `should_block` is `true`. Otherwise, returns `Ok(None)` if + /// the channel is full. + /// + /// Note this may still block when `should_block` is `false` if the channel is in an inconsistent state. + pub(crate) fn recv_blocking( + &self, + deadline: Option, + should_block: bool, + ) -> Result, RecvTimeoutError> { let token = &mut Token::default(); + + let mut state = self.receivers.start(); loop { - if self.start_recv(token) { - unsafe { - return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + // Try receiving a message several times. + let backoff = Backoff::new(); + loop { + match self.start_recv(token) { + Status::Ready => { + let res = unsafe { self.read(token) }; + return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected); + } + // If the channel is empty, return or block immediately. + Status::Empty if !should_block => return Ok(None), + Status::Empty => break, + // Otherwise spin for a bit before blocking. + Status::InProgress => {} + } + + if !backoff.try_spin_light() { + break; } } @@ -433,7 +527,7 @@ impl Channel { // Prepare for blocking until a sender wakes us up. Context::with(|cx| { let oper = Operation::hook(token); - self.receivers.register(oper, cx); + self.receivers.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -452,6 +546,8 @@ impl Channel { } Selected::Operation(_) => {} } + + state.unpark(); }); } } @@ -531,6 +627,7 @@ impl Channel { /// /// This method should only be called when all receivers are dropped. fn discard_all_messages(&self) { + let token = &mut Token::default(); let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); loop { @@ -548,8 +645,8 @@ impl Channel { let mut head = self.head.index.load(Ordering::Acquire); // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts - // to initalize the first block before noticing that the receivers disconnected. Late allocations - // will be deallocated by the sender in Drop. + // to initialize the first block before noticing that the receivers disconnected. Late allocations + // will be deallocated by the sender in Drop let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel); // If we're going to be dropping messages we need to synchronize with initialization @@ -572,11 +669,10 @@ impl Channel { if offset < BLOCK_CAP { // Drop the message in the slot. let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let p = &mut *slot.msg.get(); - p.as_mut_ptr().drop_in_place(); + slot.wait_write(&self.receivers, token); + (*slot.msg.get()).assume_init_drop(); } else { - (*block).wait_next(); + (*block).wait_next(&self.receivers, token); // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Acquire); drop(Box::from_raw(block)); @@ -591,7 +687,6 @@ impl Channel { drop(Box::from_raw(block)); } } - head &= !MARK_BIT; self.head.index.store(head, Ordering::Release); } @@ -614,11 +709,61 @@ impl Channel { } } +/// Blocks until a read operation succeeds, returning the value once it does. +fn watch_until(receivers: &SyncWaker, token: &mut Token, try_read: impl Fn() -> Option) -> T { + let mut state = receivers.start(); + let mut value = None; + + loop { + // Try reading the message several times. + let backoff = Backoff::new(); + loop { + if value.is_none() { + value = try_read(); + } + + if let Some(value) = value { + return value; + } + + if !backoff.try_spin_light() { + break; + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + // Register to be notified after any message is sent. + let oper = Operation::hook(token); + receivers.watch(oper, cx, &state); + + // Was the message just sent? + if let Some(read) = try_read() { + value = Some(read); + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + receivers.unwatch(oper); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); + } +} + impl Drop for Channel { fn drop(&mut self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let mut tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let mut head = *self.head.index.get_mut(); + let mut tail = *self.tail.index.get_mut(); + let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); @@ -632,11 +777,10 @@ impl Drop for Channel { if offset < BLOCK_CAP { // Drop the message in the slot. let slot = (*block).slots.get_unchecked(offset); - let p = &mut *slot.msg.get(); - p.as_mut_ptr().drop_in_place(); + (*slot.msg.get()).assume_init_drop(); } else { // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs index 0cbc61160f7ee..252b80423f7d1 100644 --- a/library/std/src/sync/mpmc/utils.rs +++ b/library/std/src/sync/mpmc/utils.rs @@ -121,6 +121,24 @@ impl Backoff { self.step.set(self.step.get() + 1); } + /// Backs off using lightweight spinning + /// + /// Returns `false` if backoff has completed and blocking the thread is advised. + #[inline] + pub fn try_spin_light(&self) -> bool { + if self.step.get() > SPIN_LIMIT { + return false; + } + + let step = self.step.get().min(SPIN_LIMIT); + for _ in 0..step.pow(2) { + crate::hint::spin_loop(); + } + + self.step.set(self.step.get() + 1); + true + } + /// Backs off using heavyweight spinning. /// /// This method should be used in blocking loops where parking the thread is not an option. diff --git a/library/std/src/sync/mpmc/waker.rs b/library/std/src/sync/mpmc/waker.rs index 9aab1b9417edb..c6fdb5f73599c 100644 --- a/library/std/src/sync/mpmc/waker.rs +++ b/library/std/src/sync/mpmc/waker.rs @@ -4,7 +4,7 @@ use super::context::Context; use super::select::{Operation, Selected}; use crate::ptr; -use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::atomic::{AtomicU32, Ordering}; use crate::sync::Mutex; /// Represents a thread blocked on a specific channel operation. @@ -94,6 +94,18 @@ impl Waker { } } + /// Registers an operation waiting to be ready. + #[inline] + pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { + self.observers.push(Entry { oper, packet: ptr::null_mut(), cx: cx.clone() }); + } + + /// Unregisters an operation waiting to be ready. + #[inline] + pub(crate) fn unwatch(&mut self, oper: Operation) { + self.observers.retain(|e| e.oper != oper); + } + /// Notifies all operations waiting to be ready. #[inline] pub(crate) fn notify(&mut self) { @@ -137,66 +149,201 @@ pub(crate) struct SyncWaker { /// The inner `Waker`. inner: Mutex, - /// `true` if the waker is empty. - is_empty: AtomicBool, + /// Atomic state for this waker. + state: WakerState, } impl SyncWaker { /// Creates a new `SyncWaker`. #[inline] pub(crate) fn new() -> Self { - SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) } + SyncWaker { inner: Mutex::new(Waker::new()), state: WakerState::new() } + } + + /// Returns a token that can be used to manage the state of a blocking operation. + pub(crate) fn start(&self) -> BlockingState<'_> { + BlockingState { is_waker: false, waker: self } } /// Registers the current thread with an operation. #[inline] - pub(crate) fn register(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock().unwrap(); - inner.register(oper, cx); - self.is_empty - .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); + pub(crate) fn register(&self, oper: Operation, cx: &Context, state: &BlockingState<'_>) { + self.inner.lock().unwrap().register(oper, cx); + self.state.park(state.is_waker); } /// Unregisters an operation previously registered by the current thread. #[inline] pub(crate) fn unregister(&self, oper: Operation) -> Option { - let mut inner = self.inner.lock().unwrap(); - let entry = inner.unregister(oper); - self.is_empty - .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); - entry + self.inner.lock().unwrap().unregister(oper) } /// Attempts to find one thread (not the current one), select its operation, and wake it up. #[inline] pub(crate) fn notify(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut inner = self.inner.lock().unwrap(); - if !self.is_empty.load(Ordering::SeqCst) { - inner.try_select(); - inner.notify(); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); - } + if self.state.try_notify() { + self.notify_one() } } + // Finds a thread (not the current one), select its operation, and wake it up. + #[inline] + pub(crate) fn notify_one(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.try_select(); + inner.notify(); + } + + /// Registers an operation waiting to be ready. + /// + /// All watching threads are notified when a relevant operation completes, instead of being + /// put in the registration queue. + #[inline] + pub(crate) fn watch(&self, oper: Operation, cx: &Context, state: &BlockingState<'_>) { + self.inner.lock().unwrap().watch(oper, cx); + self.state.park(state.is_waker); + } + + /// Unregisters an operation waiting to be ready. + #[inline] + pub(crate) fn unwatch(&self, oper: Operation) { + let mut inner = self.inner.lock().unwrap(); + inner.unwatch(oper); + } + /// Notifies all threads that the channel is disconnected. #[inline] pub(crate) fn disconnect(&self) { let mut inner = self.inner.lock().unwrap(); inner.disconnect(); - self.is_empty - .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); } } impl Drop for SyncWaker { #[inline] fn drop(&mut self) { - debug_assert!(self.is_empty.load(Ordering::SeqCst)); + debug_assert!(!self.state.has_waiters()); + } +} + +/// A guard that manages the state of a blocking operation. +pub(crate) struct BlockingState<'a> { + /// True if this thread is the waker thread, meaning it must + /// try to notify waiters after it completes. + is_waker: bool, + + waker: &'a SyncWaker, +} + +impl BlockingState<'_> { + /// Reset the state after waking up from parking. + #[inline] + pub(crate) fn unpark(&mut self) { + self.is_waker = self.waker.state.unpark(); + } +} + +impl Drop for BlockingState<'_> { + fn drop(&mut self) { + if self.is_waker && self.waker.state.drop_waker() { + self.waker.notify_one(); + } + } +} + +const NOTIFIED: u32 = 0b001; +const WAKER: u32 = 0b010; + +/// The state of a `SyncWaker`. +/// +/// A waker manages a count of waiting threads, as well as a "waker thread". +/// The waker thread has the task of waking up another thread after it succeeds, +/// transferring ownership of notifications. If there is an active waker thread, +/// additional threads are not notified. This throttles wakeups, reducing contention +/// on the channel, as well as creates a chain that makes up for lost wakeups, +/// allowing threads to park even if the channel is in an inconsistent state. +struct WakerState { + state: AtomicU32, +} + +impl WakerState { + /// Initialize the waker state. + fn new() -> WakerState { + WakerState { state: AtomicU32::new(0) } + } + + /// Returns whether or not a waiter needs to be notified. + fn try_notify(&self) -> bool { + // because storing a value in the channel is also sequentially consistent, + // this creates a total order between storing a value and registering a waiter. + let state = self.state.load(Ordering::SeqCst); + + // if a notification is already set, the waker thread will take care + // of further notifications. otherwise we have to notify if there are waiters + if (state >> WAKER) > 0 && (state & NOTIFIED == 0) { + return self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // set the notification if there are waiters and it is not already set + if (state >> WAKER) > 0 && (state & NOTIFIED == 0) { + Some(state | (WAKER | NOTIFIED)) + } else { + None + } + }) + .is_ok(); + } + + false + } + + /// Get ready for this waker to park. The channel should be checked after calling this + /// method, and before parking. + fn park(&self, waker: bool) { + // increment the waiter count. if we are the waker thread, we also have to remove the + // notification to allow other waiters to be notified after we park + let update = (1_u32 << WAKER).wrapping_sub(u32::from(waker)); + self.state.fetch_add(update, Ordering::SeqCst); + } + + /// Remove this waiter from the waker state after it was unparked. + /// + /// Returns `true` if this thread became the waking thread and must call `drop_waker` + /// after it completes it's operation. + fn unpark(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // decrement the waiter count and consume the waker token + Some((state - (1 << WAKER)) & !WAKER) + }) + // did we consume the token and become the waker thread? + .map(|state| state & WAKER != 0) + .unwrap() + } + + /// Called by the waking thread after completing it's operation. + /// + /// Returns `true` if a waiter should be notified. + fn drop_waker(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // if there are waiters, set the waker token and wake someone, transferring the + // waker thread. otherwise unset the notification so new waiters can synchronize + // with new notifications + Some(if (state >> WAKER) > 0 { + state | WAKER + } else { + state.wrapping_sub(NOTIFIED) + }) + }) + // were there waiters? + .map(|state| (state >> WAKER) > 0) + .unwrap() + } + + /// Returns `true` if there are active waiters. + fn has_waiters(&self) -> bool { + (self.state.load(Ordering::Relaxed) >> WAKER) > 0 } }