Skip to content

Commit

Permalink
fix eventfd logic and prevent multiple registrations
Browse files Browse the repository at this point in the history
Commit 8874895 introduced a regression that manifests as a SIGSEGV.
The reason for that is that the patch introduced Source reuse, in order
to implement read coalescing. Calls to take_result() were transformed into
result(), leaving the original Result in place for other consumers to
peek.

This is all fine for user-visible Sources, but our eventfd manipulation
is a bit more intricate, and relied on the presence or absence of a
Result to decide whether or not to install a new eventfd.

As a result of the change, every time the reactor is invoked after the
eventfd fired for the first time, a new eventfd source is registered,
which is wrong.

But what makes this problem really nasty is the fact that the kernel
already knows about the previous address. When we register a new Source,
we also add a new buffer. The kernel, though, will write to the previous
buffer which was already freed when we installed the new one.

Aside from fixing the issue, this patch also does the following:
- adds more comments about eventfd, so people can more easily follow
  what's going on
- adds an assert on fill_sqe's read. I can't imagine anyone outside
  of eventfd having a legitimate excuse to reuse a Source like this.
- Adds a new buffer type, special for Eventfd. Not only this is good
  defensively, because even upon multiple registration the kernel would
  keep writing always to the same location, it also saves an allocation.

Fixes #396

(cherry picked from commit 72b626f)
  • Loading branch information
Glauber Costa committed Aug 12, 2021
1 parent ab745cd commit e6c129e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
3 changes: 3 additions & 0 deletions glommio/src/sys/dma_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,23 @@ impl Drop for SysAlloc {
pub(crate) enum BufferStorage {
Sys(SysAlloc),
Uring(UringBuffer),
EventFd(*mut u8),
}

impl BufferStorage {
fn as_ptr(&self) -> *const u8 {
match self {
BufferStorage::Sys(x) => x.as_ptr(),
BufferStorage::Uring(x) => x.as_ptr(),
BufferStorage::EventFd(x) => *x as *const u8,
}
}

fn as_mut_ptr(&mut self) -> *mut u8 {
match self {
BufferStorage::Sys(x) => x.as_mut_ptr(),
BufferStorage::Uring(x) => x.as_mut_ptr(),
BufferStorage::EventFd(x) => *x,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions glommio/src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ impl Source {
self.inner.borrow_mut().wakers.waiters.push(waker)
}

// used for eventfd, and other internal sources that reuse the same source
// across many invocations
pub(super) fn take_result(&self) -> Option<io::Result<usize>> {
self.inner
.borrow_mut()
.wakers
.result
.take()
.map(|x| OsResult::from(x).into())
}

pub(super) fn raw(&self) -> RawFd {
self.inner.borrow().raw
}
Expand Down
31 changes: 27 additions & 4 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ fn fill_sqe<F>(
if let SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), slot) =
&mut x.source_type
{
// If you have a buffer here, that very likely means you are reusing the
// source. The kernel knows about that buffer already, and will write to
// it. So this can only be called if there is no buffer attached to it.
assert!(slot.is_none());
*slot = Some(IoBuffer::Dma(buf));
} else {
unreachable!("Expected Read source type");
Expand Down Expand Up @@ -855,6 +859,7 @@ struct SleepableRing {
stats: RingIoStats,
task_queue_stats: AHashMap<TaskQueueHandle, RingIoStats>,
source_map: Rc<RefCell<SourceMap>>,
eventfd_buffer: Vec<u8>,
}

impl SleepableRing {
Expand All @@ -875,6 +880,7 @@ impl SleepableRing {
stats: RingIoStats::new(),
task_queue_stats: AHashMap::new(),
source_map,
eventfd_buffer: Vec::with_capacity(8),
})
}

Expand Down Expand Up @@ -917,6 +923,8 @@ impl SleepableRing {
}

fn install_eventfd(&mut self, eventfd_src: &Source) -> bool {
assert!(eventfd_src.result().is_none());

if let Some(mut sqe) = self.ring.prepare_sqe() {
self.waiting_submission += 1;
// Now must wait on the eventfd in case someone wants to wake us up.
Expand All @@ -931,11 +939,23 @@ impl SleepableRing {
),
args: UringOpDescriptor::Read(0, 8),
};
let allocator = self.allocator.clone();

let buffer_ptr = {
let mut src_type = eventfd_src.source_type_mut();
*src_type = SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), None);
assert!(self.eventfd_buffer.capacity() >= 8);
self.eventfd_buffer.as_mut_ptr()
};

fill_sqe(
&mut sqe,
&op,
|size| allocator.new_buffer(size),
|size| {
Some(DmaBuffer::with_storage(
size,
BufferStorage::EventFd(buffer_ptr),
))
},
&mut *self.source_map.borrow_mut(),
);
true
Expand Down Expand Up @@ -971,7 +991,7 @@ impl SleepableRing {
return self.ring.submit_sqes().map(|x| x as usize);
}

let res = eventfd_src.result();
let res = eventfd_src.take_result();
match res {
None => {
// We already have the eventfd registered and nobody woke us up so far.
Expand Down Expand Up @@ -1100,7 +1120,10 @@ pub(crate) struct Reactor {

// This keeps the eventfd alive. Drop will close it when we're done
notifier: Arc<sys::SleepNotifier>,
// This is the source used to handle the notifications into the ring
// This is the source used to handle the notifications into the ring.
// It is reused, unlike the timeout src, because it is possible and likely
// that it will be in the ring through many calls to the reactor loop. It only ever gets
// completed if this reactor is woken up from another one
eventfd_src: Source,
source_map: Rc<RefCell<SourceMap>>,
}
Expand Down

0 comments on commit e6c129e

Please sign in to comment.