From e6c129e571180cca5c246aa0a4644dedc004a150 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 11 Aug 2021 20:51:50 -0400 Subject: [PATCH] fix eventfd logic and prevent multiple registrations Commit 8874895 introduced a regression that manifests as a SIGSEGV. The reason for that is that the patch introduced Source reuse, in order to implement read coalescing. Calls to take_result() were transformed into result(), leaving the original Result in place for other consumers to peek. This is all fine for user-visible Sources, but our eventfd manipulation is a bit more intricate, and relied on the presence or absence of a Result to decide whether or not to install a new eventfd. As a result of the change, every time the reactor is invoked after the eventfd fired for the first time, a new eventfd source is registered, which is wrong. But what makes this problem really nasty is the fact that the kernel already knows about the previous address. When we register a new Source, we also add a new buffer. The kernel, though, will write to the previous buffer which was already freed when we installed the new one. Aside from fixing the issue, this patch also does the following: - adds more comments about eventfd, so people can more easily follow what's going on - adds an assert on fill_sqe's read. I can't imagine anyone outside of eventfd having a legitimate excuse to reuse a Source like this. - Adds a new buffer type, special for Eventfd. Not only this is good defensively, because even upon multiple registration the kernel would keep writing always to the same location, it also saves an allocation. Fixes #396 (cherry picked from commit 72b626f9fa4972513cb9664a834cf6cab13b695a) --- glommio/src/sys/dma_buffer.rs | 3 +++ glommio/src/sys/source.rs | 11 +++++++++++ glommio/src/sys/uring.rs | 31 +++++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/glommio/src/sys/dma_buffer.rs b/glommio/src/sys/dma_buffer.rs index 98377a711..a1fc56ef2 100644 --- a/glommio/src/sys/dma_buffer.rs +++ b/glommio/src/sys/dma_buffer.rs @@ -52,6 +52,7 @@ impl Drop for SysAlloc { pub(crate) enum BufferStorage { Sys(SysAlloc), Uring(UringBuffer), + EventFd(*mut u8), } impl BufferStorage { @@ -59,6 +60,7 @@ impl BufferStorage { match self { BufferStorage::Sys(x) => x.as_ptr(), BufferStorage::Uring(x) => x.as_ptr(), + BufferStorage::EventFd(x) => *x as *const u8, } } @@ -66,6 +68,7 @@ impl BufferStorage { match self { BufferStorage::Sys(x) => x.as_mut_ptr(), BufferStorage::Uring(x) => x.as_mut_ptr(), + BufferStorage::EventFd(x) => *x, } } } diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index 2b4295b15..58d121f7a 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -232,6 +232,17 @@ impl Source { self.inner.borrow_mut().wakers.waiters.push(waker) } + // used for eventfd, and other internal sources that reuse the same source + // across many invocations + pub(super) fn take_result(&self) -> Option> { + self.inner + .borrow_mut() + .wakers + .result + .take() + .map(|x| OsResult::from(x).into()) + } + pub(super) fn raw(&self) -> RawFd { self.inner.borrow().raw } diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index c08c33d17..340ead016 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -300,6 +300,10 @@ fn fill_sqe( if let SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), slot) = &mut x.source_type { + // If you have a buffer here, that very likely means you are reusing the + // source. The kernel knows about that buffer already, and will write to + // it. So this can only be called if there is no buffer attached to it. + assert!(slot.is_none()); *slot = Some(IoBuffer::Dma(buf)); } else { unreachable!("Expected Read source type"); @@ -855,6 +859,7 @@ struct SleepableRing { stats: RingIoStats, task_queue_stats: AHashMap, source_map: Rc>, + eventfd_buffer: Vec, } impl SleepableRing { @@ -875,6 +880,7 @@ impl SleepableRing { stats: RingIoStats::new(), task_queue_stats: AHashMap::new(), source_map, + eventfd_buffer: Vec::with_capacity(8), }) } @@ -917,6 +923,8 @@ impl SleepableRing { } fn install_eventfd(&mut self, eventfd_src: &Source) -> bool { + assert!(eventfd_src.result().is_none()); + if let Some(mut sqe) = self.ring.prepare_sqe() { self.waiting_submission += 1; // Now must wait on the eventfd in case someone wants to wake us up. @@ -931,11 +939,23 @@ impl SleepableRing { ), args: UringOpDescriptor::Read(0, 8), }; - let allocator = self.allocator.clone(); + + let buffer_ptr = { + let mut src_type = eventfd_src.source_type_mut(); + *src_type = SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), None); + assert!(self.eventfd_buffer.capacity() >= 8); + self.eventfd_buffer.as_mut_ptr() + }; + fill_sqe( &mut sqe, &op, - |size| allocator.new_buffer(size), + |size| { + Some(DmaBuffer::with_storage( + size, + BufferStorage::EventFd(buffer_ptr), + )) + }, &mut *self.source_map.borrow_mut(), ); true @@ -971,7 +991,7 @@ impl SleepableRing { return self.ring.submit_sqes().map(|x| x as usize); } - let res = eventfd_src.result(); + let res = eventfd_src.take_result(); match res { None => { // We already have the eventfd registered and nobody woke us up so far. @@ -1100,7 +1120,10 @@ pub(crate) struct Reactor { // This keeps the eventfd alive. Drop will close it when we're done notifier: Arc, - // This is the source used to handle the notifications into the ring + // This is the source used to handle the notifications into the ring. + // It is reused, unlike the timeout src, because it is possible and likely + // that it will be in the ring through many calls to the reactor loop. It only ever gets + // completed if this reactor is woken up from another one eventfd_src: Source, source_map: Rc>, }