diff --git a/glommio/src/sys/dma_buffer.rs b/glommio/src/sys/dma_buffer.rs index 98377a711..a1fc56ef2 100644 --- a/glommio/src/sys/dma_buffer.rs +++ b/glommio/src/sys/dma_buffer.rs @@ -52,6 +52,7 @@ impl Drop for SysAlloc { pub(crate) enum BufferStorage { Sys(SysAlloc), Uring(UringBuffer), + EventFd(*mut u8), } impl BufferStorage { @@ -59,6 +60,7 @@ impl BufferStorage { match self { BufferStorage::Sys(x) => x.as_ptr(), BufferStorage::Uring(x) => x.as_ptr(), + BufferStorage::EventFd(x) => *x as *const u8, } } @@ -66,6 +68,7 @@ impl BufferStorage { match self { BufferStorage::Sys(x) => x.as_mut_ptr(), BufferStorage::Uring(x) => x.as_mut_ptr(), + BufferStorage::EventFd(x) => *x, } } } diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index 2b4295b15..58d121f7a 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -232,6 +232,17 @@ impl Source { self.inner.borrow_mut().wakers.waiters.push(waker) } + // used for eventfd, and other internal sources that reuse the same source + // across many invocations + pub(super) fn take_result(&self) -> Option> { + self.inner + .borrow_mut() + .wakers + .result + .take() + .map(|x| OsResult::from(x).into()) + } + pub(super) fn raw(&self) -> RawFd { self.inner.borrow().raw } diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index c08c33d17..340ead016 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -300,6 +300,10 @@ fn fill_sqe( if let SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), slot) = &mut x.source_type { + // If you have a buffer here, that very likely means you are reusing the + // source. The kernel knows about that buffer already, and will write to + // it. So this can only be called if there is no buffer attached to it. + assert!(slot.is_none()); *slot = Some(IoBuffer::Dma(buf)); } else { unreachable!("Expected Read source type"); @@ -855,6 +859,7 @@ struct SleepableRing { stats: RingIoStats, task_queue_stats: AHashMap, source_map: Rc>, + eventfd_buffer: Vec, } impl SleepableRing { @@ -875,6 +880,7 @@ impl SleepableRing { stats: RingIoStats::new(), task_queue_stats: AHashMap::new(), source_map, + eventfd_buffer: Vec::with_capacity(8), }) } @@ -917,6 +923,8 @@ impl SleepableRing { } fn install_eventfd(&mut self, eventfd_src: &Source) -> bool { + assert!(eventfd_src.result().is_none()); + if let Some(mut sqe) = self.ring.prepare_sqe() { self.waiting_submission += 1; // Now must wait on the eventfd in case someone wants to wake us up. @@ -931,11 +939,23 @@ impl SleepableRing { ), args: UringOpDescriptor::Read(0, 8), }; - let allocator = self.allocator.clone(); + + let buffer_ptr = { + let mut src_type = eventfd_src.source_type_mut(); + *src_type = SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), None); + assert!(self.eventfd_buffer.capacity() >= 8); + self.eventfd_buffer.as_mut_ptr() + }; + fill_sqe( &mut sqe, &op, - |size| allocator.new_buffer(size), + |size| { + Some(DmaBuffer::with_storage( + size, + BufferStorage::EventFd(buffer_ptr), + )) + }, &mut *self.source_map.borrow_mut(), ); true @@ -971,7 +991,7 @@ impl SleepableRing { return self.ring.submit_sqes().map(|x| x as usize); } - let res = eventfd_src.result(); + let res = eventfd_src.take_result(); match res { None => { // We already have the eventfd registered and nobody woke us up so far. @@ -1100,7 +1120,10 @@ pub(crate) struct Reactor { // This keeps the eventfd alive. Drop will close it when we're done notifier: Arc, - // This is the source used to handle the notifications into the ring + // This is the source used to handle the notifications into the ring. + // It is reused, unlike the timeout src, because it is possible and likely + // that it will be in the ring through many calls to the reactor loop. It only ever gets + // completed if this reactor is woken up from another one eventfd_src: Source, source_map: Rc>, }