Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std::sync::mpsc::Receiver::try_recv can block forever if sending thread is blocked #112723

Open
benhansen-io opened this issue Jun 16, 2023 · 20 comments
Labels
A-concurrency Area: Concurrency C-bug Category: This is a bug. T-libs Relevant to the library team, which will review and decide on the PR/issue.

Comments

@benhansen-io
Copy link

benhansen-io commented Jun 16, 2023

I tried this code:

use std::{
    sync::{mpsc, Arc},
    time::{Duration, Instant},
};

use thread_priority::ThreadPriority;

fn main() {
    const PINNED_CORE: usize = 2;

    let (sender, receiver) = mpsc::channel::<usize>();

    std::thread::Builder::new()
        .name("sending".to_owned())
        .spawn(move || {
            thread_priority::set_current_thread_priority(ThreadPriority::Min).unwrap();
            core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });

            loop {
                sender.send(42).unwrap();
            }
        })
        .unwrap();

    let num_received = Arc::new(std::sync::atomic::AtomicUsize::new(0));

    std::thread::Builder::new()
        .name("receiving".to_owned())
        .spawn({
            let num_received = num_received.clone();
            move || {
                thread_priority::set_current_thread_priority(ThreadPriority::Max).unwrap();
                core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });

                loop {
                    let start = Instant::now();
                    let try_receive_result = receiver.try_recv();
                    let elapsed = start.elapsed();
                    if elapsed > Duration::from_secs(1) {
                        println!("try_recv blocked for {:.2} seconds", elapsed.as_secs_f32());
                    }
                    match try_receive_result {
                        Ok(_) => {
                            num_received.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                        }
                        Err(mpsc::TryRecvError::Empty) => {
                            std::thread::sleep(Duration::from_millis(200));
                        }
                        Err(mpsc::TryRecvError::Disconnected) => unreachable!(),
                    }
                }
            }
        })
        .unwrap();

    loop {
        std::thread::sleep(Duration::from_millis(500));
        println!(
            "Receiving thread has received {}",
            num_received.load(std::sync::atomic::Ordering::SeqCst)
        )
    }
}

(full crate code available at https://github.com/benhansen-io/mpsc_deadlock_reproducer)

Based on the following documentation:

This method will never block the caller in order to wait for data to become available. Instead, this will always return immediately with a possible option of pending data on the channel.

I would not expect try_recv to ever block but the ouput shows lines such as:

Receiving thread has received 740113466                                                                                                 
Receiving thread has received 740113466                                                                                                 
Receiving thread has received 740113466                                                                                                 
try_recv blocked for 26.77 seconds                                                                                                      
Receiving thread has received 740354709                                                                                                 
Receiving thread has received 747929297                                                                                                 
Receiving thread has received 754959588

When a deadlock is happening I get the following backtraces:

Backtrace of the sending thread:

0  core::sync::atomic::atomic_or<usize> (dst=0x7fffec243ac8, val=1, order=<optimized out>) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/sync/atomic.rs:3329
#1  core::sync::atomic::AtomicUsize::fetch_or (self=0x7fffec243ac8, val=1, order=<optimized out>) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/sync/atomic.rs:2645
#2  std::sync::mpmc::list::Channel<usize>::write<usize> (self=0x5555555d4c00, token=0x7ffff7c8c910, msg=42) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/list.rs:270
#3  0x0000555555572949 in std::sync::mpmc::list::Channel<usize>::send<usize> (self=0x5555555d4c00, msg=42, _deadline=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/list.rs:403
#4  0x0000555555570015 in std::sync::mpmc::Sender<usize>::send<usize> (self=0x7ffff7c8cb10, msg=42) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/mod.rs:128
#5  0x0000555555566ad3 in std::sync::mpsc::Sender<usize>::send<usize> (self=0x7ffff7c8cb10, t=42) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpsc/mod.rs:614
#6  0x0000555555573e8a in mpsc_deadlock_reproducer::main::{closure#0} () at src/main.rs:20
#7  0x000055555556ac23 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#8  0x000055555556ef63 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#9  0x0000555555572d93 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>> (self=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#10 0x0000555555564d86 in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (data=0x7ffff7c8cc90)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#11 0x00005555555650fb in __rust_try ()
#12 0x0000555555564b0e in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>> (f=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#13 0x0000555555565343 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (f=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panic.rs:140
#14 0x000055555556e9bc in std::thread::{impl#0}::spawn_unchecked_::{closure#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:525
#15 0x00005555555611ee in core::ops::function::FnOnce::call_once<std::thread::{impl#0}::spawn_unchecked_::{closure_env#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/ops/function.rs:250
#16 0x0000555555597085 in alloc::boxed::{impl#45}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> () at library/alloc/src/boxed.rs:1973
#17 alloc::boxed::{impl#45}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> () at library/alloc/src/boxed.rs:1973
#18 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#19 0x00007ffff7d1844b in ?? () from /usr/lib/libc.so.6
#20 0x00007ffff7d9be40 in ?? () from /usr/lib/libc.so.6

Backtrace of the receiving thread:

#0  0x00007ffff7d804fb in sched_yield () from /usr/lib/libc.so.6
#1  0x0000555555570586 in std::sync::mpmc::utils::Backoff::spin_heavy (self=0x7ffff7a8b664)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/utils.rs:130
#2  0x0000555555570a7f in std::sync::mpmc::list::Slot<usize>::wait_write<usize> (self=0x7fffec243ac0)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/list.rs:49
#3  0x000055555557266a in std::sync::mpmc::list::Channel<usize>::read<usize> (self=0x5555555d4c00, token=0x7ffff7a8b800)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/list.rs:373
#4  0x0000555555572cbc in std::sync::mpmc::list::Channel<usize>::try_recv<usize> (self=0x5555555d4c00)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/list.rs:411
#5  0x00005555555701ba in std::sync::mpmc::Receiver<usize>::try_recv<usize> (self=0x7ffff7a8bba0)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpmc/mod.rs:290
#6  0x0000555555566af6 in std::sync::mpsc::Receiver<usize>::try_recv<usize> (self=0x7ffff7a8bba0)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sync/mpsc/mod.rs:801
#7  0x0000555555573f9c in mpsc_deadlock_reproducer::main::{closure#1} () at src/main.rs:37
#8  0x000055555556ac59 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>
    (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#9  0x000055555556ef45 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#10 0x0000555555572d58 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>> (self=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#11 0x0000555555564dfe in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (data=0x7ffff7a8bc70)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#12 0x00005555555650fb in __rust_try ()
#13 0x0000555555564bae in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>> (f=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#14 0x0000555555565366 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (f=...)

try_recv calling read which calls wait_write thus causing try_recv to wait on the sender seems fundamentally wrong.

Meta

rustc --version --verbose:

rustc 1.70.0 (90c541806 2023-05-31)
binary: rustc
commit-hash: 90c541806f23a127002de5b4038be731ba1458ca
commit-date: 2023-05-31
host: x86_64-unknown-linux-gnu
release: 1.70.0
LLVM version: 16.0.2
@benhansen-io benhansen-io added the C-bug Category: This is a bug. label Jun 16, 2023
@saethlin saethlin added the T-libs Relevant to the library team, which will review and decide on the PR/issue. label Jun 17, 2023
@the8472
Copy link
Member

the8472 commented Jun 17, 2023

Since the std channels are mostly based on crossbeam, have you checked if it affects them too? If so then an upstream fix which we can pick up would be most appropriate.

@benhansen-io
Copy link
Author

I just verified the issue happens upstream. I opened crossbeam-rs/crossbeam#997 and updated the reproducer crate to optionally use crossbeam instead of std.

@taiki-e
Copy link
Member

taiki-e commented Jun 17, 2023

cc @ibraheemdev

@ibraheemdev
Copy link
Member

Unfortunately, try_recv must spin in certain cases to preserve linearizability. If it didn't, an stalled sender could lead to confusing behavior:

thread::spawn(|| tx.send(1)); // stalls during send(1)
tx.send(2);
let x = rx.try_recv(); // None, because it's waiting on the send(1) and doesn't see send(2)

The previous implementation had a similar spinning case. It's unlikely that either of these cases should cause noticeable delays, but issue is exacerbated in the example here due to the thread priorities on a pinned core.

Fixing this issue would either require rewriting the channel to be lock-free, which would add a considerable amount of complexity that we've been trying to avoid, or returning potentially inconsistent results from try_recv. The documentation does seem to technically allow such behavior:

This method will never block the caller in order to wait for data to become available. Instead, this will always return immediately with a possible option of pending data on the channel.

but realistically, it's likely to do more harm than good.

@the8472
Copy link
Member

the8472 commented Jul 30, 2023

Fixing this issue would either require rewriting the channel to be lock-free [...] or returning potentially inconsistent results from try_recv.

Is switching using a notification mechanism when it has to spin for an unexpected amount of time not an option?

@ibraheemdev
Copy link
Member

@the8472 the question here is about try_recv where we don't want to block at all in the first place.

@the8472
Copy link
Member

the8472 commented Jul 30, 2023

I don't see how yielding for an extended amount of time is an improvement over blocking. If the assumption is that the spin will not last long is violated then the correct response is to coordinate with the thing we're waiting for, that coordination means telling the OS that A (a high-priority thread) is waiting on X and X will have to run before A can make progress which allows the OS to priority-boost X and stop scheduling A.

@ibraheemdev
Copy link
Member

ibraheemdev commented Jul 30, 2023

@the8472 you're right, my comment was more about how there needs to be some form of blocking (whether that's spinning, or a proper parking mechanism) in order to maintain correctness. A notification mechanism would likely be an improvement in this case, but still potentially problematic for the wasm issue mentioned here.

That being said, the notification mechanism in question is still not as straightforward as the one currently implemented. The original sender needs a way of knowing it has to wakeup multiple receivers, or each receiver woken up must continue the notification chain until the channel is visibly empty (for crossbeam's MPMC channel, the fix for std might be simpler, but would require deviating from crossbeam, which the libs team has expressed they want to avoid).

@the8472
Copy link
Member

the8472 commented Jul 30, 2023

If this only happens in exceptional circumstances then I hope the wait/wakeup dance could be conditionally enabled through a shared atomic for the entire set of senders/receivers. As long as it doesn't get mutated frequently it shouldn't add contention. Though I haven't thought much about the necessary order to achieve that, lost wakeups can be tricky.

@benhansen-io
Copy link
Author

I agree that linearizability is at odds with being completely non-blocking in the implementation where writers reserve a slot before completing their write. I also do think linearizability is probably being depended on in the real world (its interested because it is not guaranteed in the documentation of the standard library from what I could see and non-blocking is).

Perhaps we should update the try_recv documentation to something like the following:

Attempts to return a pending value on this receiver without waiting for a sender to start sending an item.

If no sender has started sending an item, this will return `Err([TryRecvError::Empty])`.

This function may block on a sender that has started but not finished writing an item to the channel until the write has completed.

We could later optionally add another try_recv_non_blocking function that is guaranteed to not block but does not provide linearizability if desired. I personally don't need it but sounds like the WASM case might.

@the8472
Copy link
Member

the8472 commented Aug 15, 2023

Unfortunately, try_recv must spin in certain cases to preserve linearizability. If it didn't, an stalled sender could lead to confusing behavior:

thread::spawn(|| tx.send(1)); // stalls during send(1)
tx.send(2);
let x = rx.try_recv(); // None, because it's waiting on the send(1) and doesn't see send(2)

Maybe that's acceptable? Any user of try_recv should already be able to handle Err(Empty) results.
And if they know for sure that there should be at least one value they could use recv instead.

@kyrias
Copy link
Contributor

kyrias commented Aug 15, 2023

Returning Err(Empty) in this case is the behavior we want and built our system expecting, and the current behavior violates the documented promise that the method will never block the caller to wait for data to become available, which is what's going on here.


In the meantime, does anyone know of a maintained MPSC channel which actually provides this guarantee? This is a very big problem for us.

@the8472
Copy link
Member

the8472 commented Aug 15, 2023

In the meantime, does anyone know of a maintained MPSC channel which actually provides this guarantee? This is a very big problem for us.

You could try flume, its spinning seems to be an optional feature

@kyrias
Copy link
Contributor

kyrias commented Aug 15, 2023

I tried that earlier today and it looks promising, though even with spin disabled it still uses a spinlock in one place which I ended up getting a watchdog timeout for. Tomorrow I'm going to see if I can make that one optional as well.

@benhansen-io
Copy link
Author

does anyone know of a maintained MPSC channel which actually provides this guarantee?

https://github.com/benhansen-io/crossbeam/tree/try_recv_no_block has a single commit that should make crossbeam's unbounded channel's try_recv truly non-blocking.

And if they know for sure that there should be at least one value they could use recv instead.

This is a really good point.

@ibraheemdev
Copy link
Member

ibraheemdev commented Aug 15, 2023

And if they know for sure that there should be at least one value they could use recv instead.

That's not entirely true. I often use .try_lock().unwrap() when I know the lock is available, for example. try_recv().unwrap() may be used in similar circumstances, though the documentation seems to say it should not be. I agree though that all the common use cases I have seen do not rely on that though.

@kyrias
Copy link
Contributor

kyrias commented Aug 15, 2023

https://github.com/benhansen-io/crossbeam/tree/try_recv_no_block has a single commit that should make crossbeam's unbounded channel's try_recv truly non-blocking.

We have the same issue on try_send because it has the other side of the same logic. Are you planning on trying to get this functionality upstreamed btw?

@benhansen-io
Copy link
Author

Depending on the result of the discussions here I am happy to do some work to try to apply the same logic to try_send and the other types of channels (e.g. bounded). It is still not clear to me that we can drop the linearizability guarantee even though it is not documented. Hyrum's Law would have me believe we will break some people that depend on the linearizability. Maybe some breakage is okay given that the guarantee wasn't documented and the fix of using recv instead of unwrapping try_recv is simple.

There are other considerations that need to be made too. For example the ready:stress_recv test also starts to fail with the above change. In that test there is a single receiver thread that selects for ready and gets a notification that a channel is ready but then try_recv still returns Empty sometimes. It turns out that is_ready also only checks that a write has started but not that it has completed. is_ready returning true and try_recv returning Err would be unexpected to me. We could change is_ready to check if the next slot is actually written though at some (minor?) performance cost but I would need to think about it some more to make sure we are still waking up listeners correctly.

@kyrias
Copy link
Contributor

kyrias commented Aug 15, 2023

It is still not clear to me that we can drop the linearizability guarantee even though it is not documented. Hyrum's Law would have me believe we will break some people that depend on the linearizability. Maybe some breakage is okay given that the guarantee wasn't documented and the fix of using recv instead of unwrapping try_recv is simple.

The question is though if there's any other solution that doesn't involve continuing to break the documented guarantee, that these methods won't block.

There are other considerations that need to be made too. For example the ready:stress_recv test also starts to fail with the above change. In that test there is a single receiver thread that selects for ready and gets a notification that a channel is ready but then try_recv still returns Empty sometimes. It turns out that is_ready also only checks that a write has started but not that it has completed. is_ready returning true and try_recv returning Err would be unexpected to me. We could change is_ready to check if the next slot is actually written though at some (minor?) performance cost but I would need to think about it some more to make sure we are still waking up listeners correctly.

Hmm, this makes me wonder, how does this also interact with bounded channels with a capacity of 0?

@kyrias
Copy link
Contributor

kyrias commented Aug 17, 2023

As of flume 0.11 the remaining spinlock that existed with all features disabled has been removed and now appears to be a usable alternative until the std channel is fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-concurrency Area: Concurrency C-bug Category: This is a bug. T-libs Relevant to the library team, which will review and decide on the PR/issue.
Projects
None yet
Development

No branches or pull requests

7 participants