Skip to content

Commit

Permalink
Merge pull request rust-lang#4037 from tiif/blockpipe
Browse files Browse the repository at this point in the history
Refactor AnonSocket::read/write for blocking socketpair
  • Loading branch information
RalfJung authored Nov 24, 2024
2 parents cef4b64 + 5680283 commit 1c6bfbb
Showing 1 changed file with 80 additions and 44 deletions.
124 changes: 80 additions & 44 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
let mut readbuf = readbuf.borrow_mut();
if readbuf.buf.is_empty() {
if readbuf.borrow().buf.is_empty() {
if self.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
Expand All @@ -167,31 +166,8 @@ impl FileDescription for AnonSocket {
}
}
}

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = self.peer_fd().upgrade() {
ecx.check_and_update_readiness(&peer_fd)?;
}

ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
anonsocket_read(self, self.peer_fd().upgrade(), &mut bytes, ptr, dest, ecx)
}

fn write<'tcx>(
Expand Down Expand Up @@ -221,9 +197,8 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe");
};
let mut writebuf = writebuf.borrow_mut();
let data_size = writebuf.buf.len();
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
Expand All @@ -233,24 +208,85 @@ impl FileDescription for AnonSocket {
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
}
}
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
}
}

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);
/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
available_space: usize,
peer_fd: &FileDescriptionRef,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
};
let mut writebuf = writebuf.borrow_mut();

// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(peer_fd)?;

ecx.return_write_success(actual_write_size, dest)
}

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
anonsocket: &AnonSocket,
peer_fd: Option<FileDescriptionRef>,
bytes: &mut [u8],
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(readbuf) = &anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe")
};
let mut readbuf = readbuf.borrow_mut();

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(bytes).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;

ecx.return_write_success(actual_write_size, dest)
}

ecx.return_read_success(ptr, bytes, actual_read_size, dest)
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
Expand Down

0 comments on commit 1c6bfbb

Please sign in to comment.