From 160b871de80347560d495b8b75e9fc2d1c4075cc Mon Sep 17 00:00:00 2001 From: Theo Bulut Date: Fri, 26 Jan 2024 22:01:15 +0100 Subject: [PATCH 1/3] Cargo fmt and cargo clippy --- examples/Cargo.toml | 4 ++ examples/proactor-config-fwrite.rs | 69 ++++++++++++++++++++ src/async_io.rs | 28 ++++----- src/config.rs | 73 ++++++++++++++++++++++ src/proactor.rs | 14 ++--- src/syscore/linux/iouring/fs/buffer.rs | 4 +- src/syscore/linux/iouring/fs/mod.rs | 4 +- src/syscore/linux/iouring/fs/store_file.rs | 12 ++-- src/syscore/linux/iouring/iouring.rs | 47 +++++++------- src/syscore/linux/iouring/mod.rs | 4 +- src/syscore/linux/iouring/net/multishot.rs | 4 +- src/syscore/linux/iouring/nethandle.rs | 20 +++--- src/syscore/linux/iouring/processor.rs | 68 ++++++++++---------- 13 files changed, 250 insertions(+), 101 deletions(-) create mode 100644 examples/proactor-config-fwrite.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7d217c6..36e12a1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,6 +53,10 @@ path = "h1-server.rs" name = "h1-server-multishot" path = "h1-server-multishot.rs" +[[example]] +name = "proactor-config-fwrite" +path = "proactor-config-fwrite.rs" + [[example]] name = "tcp-server" path = "tcp-server.rs" diff --git a/examples/proactor-config-fwrite.rs b/examples/proactor-config-fwrite.rs new file mode 100644 index 0000000..c289ba9 --- /dev/null +++ b/examples/proactor-config-fwrite.rs @@ -0,0 +1,69 @@ +use nuclei::*; +use std::fs::{File, OpenOptions}; +use std::io; +use std::path::PathBuf; +use std::time::Duration; + +use futures::io::SeekFrom; +use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use nuclei::config::{IoUringConfiguration, NucleiConfig}; + +const DARK_MATTER_TEXT: &'static str = "\ +Dark matter is a form of matter thought to account for approximately \ +85% of the matter in the universe and about a quarter of its total \ +mass–energy density or about 2.241×10−27 kg/m3. Its presence is implied \ +in a variety of astrophysical observations, including gravitational effects \ +that cannot be explained by accepted theories of gravity unless more matter \ +is present than can be seen. For this reason, most experts think that dark \ +matter is abundant in the universe and that it has had a strong influence \ +on its structure and evolution. Dark matter is called dark because it does \ +not appear to interact with the electromagnetic field, which means it doesn't \ +absorb, reflect or emit electromagnetic radiation, and is therefore difficult \ +to detect.[1]\ +\ +"; + +// #[nuclei::main] +fn main() -> io::Result<()> { + let nuclei_config = NucleiConfig { + // Other options for IO_URING are: + // * low_latency_driven, + // * kernel_poll_only + // * io_poll + iouring: IoUringConfiguration::interrupt_driven(1 << 11), + }; + let _ = Proactor::with_config(nuclei_config); + + // Approximately ~75,9 MB + let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n"); + + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("data"); + path.push("dark-matter"); + + drive(async { + // Approximately ~75,9 MB + let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n"); + + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("data"); + path.push("dark-matter"); + + let fo = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .unwrap(); + let mut file = Handle::::new(fo).unwrap(); + file.write_all(dark_matter.as_bytes()).await.unwrap(); + + let mut buf = vec![]; + assert!(file.seek(SeekFrom::Start(0)).await.is_ok()); + assert_eq!(file.read_to_end(&mut buf).await.unwrap(), dark_matter.len()); + assert_eq!(&buf[0..dark_matter.len()], dark_matter.as_bytes()); + + println!("Length of file is {}", buf.len()); + }); + + Ok(()) +} diff --git a/src/async_io.rs b/src/async_io.rs index 142cbb7..c37691c 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -209,9 +209,9 @@ impl AsyncRead for Handle { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - let mut store = &mut self.get_mut().store_file; + let store = &mut self.get_mut().store_file; - if let Some(mut store_file) = store.as_mut() { + if let Some(store_file) = store.as_mut() { let fd: RawFd = store_file.receive_fd(); let op_state = store_file.op_state(); let (_, pos) = store_file.bufpair(); @@ -239,10 +239,10 @@ const NON_READ: &[u8] = &[]; #[cfg(all(feature = "iouring", target_os = "linux"))] impl AsyncBufRead for Handle { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut store = &mut self.get_mut().store_file; + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let store = &mut self.get_mut().store_file; - if let Some(mut store_file) = store.as_mut() { + if let Some(store_file) = store.as_mut() { let fd: RawFd = store_file.receive_fd(); let op_state = store_file.op_state(); let (bufp, pos) = store_file.bufpair(); @@ -267,7 +267,7 @@ impl AsyncBufRead for Handle { } fn consume(self: Pin<&mut Self>, amt: usize) { - let mut store = self.get_mut().store_file.as_mut().unwrap(); + let store = self.get_mut().store_file.as_mut().unwrap(); store.buf().consume(amt); } } @@ -279,9 +279,9 @@ impl AsyncWrite for Handle { cx: &mut Context<'_>, bufslice: &[u8], ) -> Poll> { - let mut store = &mut self.get_mut().store_file; + let store = &mut self.get_mut().store_file; - if let Some(mut store_file) = store.as_mut() { + if let Some(store_file) = store.as_mut() { let fd: RawFd = store_file.receive_fd(); let op_state = store_file.op_state(); let (bufp, pos) = store_file.bufpair(); @@ -319,9 +319,9 @@ impl AsyncWrite for Handle { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - let mut store = &mut self.get_mut().store_file; + let store = &mut self.get_mut().store_file; - if let Some(mut store_file) = store.as_mut() { + if let Some(store_file) = store.as_mut() { let fd: RawFd = store_file.receive_fd(); let op_state = store_file.op_state(); let (_, pos) = store_file.bufpair(); @@ -349,9 +349,9 @@ impl AsyncWrite for Handle { } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut store = &mut self.get_mut().store_file; + let store = &mut self.get_mut().store_file; - if let Some(mut store_file) = store.as_mut() { + if let Some(store_file) = store.as_mut() { let fd: RawFd = store_file.receive_fd(); let op_state = store_file.op_state(); @@ -377,7 +377,7 @@ impl AsyncSeek for Handle { cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { - let mut store = &mut self.get_mut().store_file.as_mut().unwrap(); + let store = &mut self.get_mut().store_file.as_mut().unwrap(); let (cursor, offset) = match pos { io::SeekFrom::Start(n) => { @@ -392,7 +392,7 @@ impl AsyncSeek for Handle { } }; let valid_seek = if offset.is_negative() { - match cursor.checked_sub(offset.abs() as usize) { + match cursor.checked_sub(offset.unsigned_abs() as usize) { Some(valid_seek) => valid_seek, None => { let invalid = io::Error::from(io::ErrorKind::InvalidInput); diff --git a/src/config.rs b/src/config.rs index 15fa525..05417e7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,9 +55,80 @@ pub struct IoUringConfiguration { /// If [None] passed unbounded workers will be limited by the process task limit, /// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit. pub per_numa_unbounded_worker_count: Option, + + /// This argument allows aggressively waiting on CQ(completion queue) to have low latency of IO completions. + /// Basically, this argument polls CQEs(completion queue events) directly on cq at userspace. + /// Mind that, using this increase pressure on CPUs from userspace side. By default, nuclei reaps the CQ with + /// aggressive wait. This is double polling approach for nuclei where Kernel gets submissions by itself (SQPOLL), + /// processes it and puts completions to completion queue and we immediately pick up without latency + /// (aggressive_poll). + /// + /// **[default]**: `true`. + pub aggressive_poll: bool, + + /// Perform busy-waiting for I/O completion events, as opposed to getting notifications via an + /// asynchronous IRQ (Interrupt Request). This will reduce latency, but increases CPU usage. + /// This is only usable on file systems that support polling and files opened with `O_DIRECT`. + /// + /// **[default]**: `false`. + pub iopoll_enabled: bool, // XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage. } +impl IoUringConfiguration { + /// + /// Standard way to use IO_URING. No polling, purely IRQ awaken IO completion. + /// This is a normal way to process IO, mind that with this approach + /// `actual completion time != userland reception of completion`. + /// Throughput is low compared to all the other config alternatives. + /// + /// **NOTE:** If you don't know what to select as configuration, please select this. + pub fn interrupt_driven(queue_len: u32) -> Self { + Self { + queue_len, + sqpoll_wake_interval: None, + per_numa_bounded_worker_count: None, + per_numa_unbounded_worker_count: None, + aggressive_poll: false, + iopoll_enabled: false, + } + } + + /// + /// Low Latency Driven version of IO_URING, where it is suitable for high traffic environments. + /// High throughput low latency solution where it consumes a lot of resources. + pub fn low_latency_driven(queue_len: u32) -> Self { + Self { + queue_len, + sqpoll_wake_interval: Some(2), + aggressive_poll: true, + ..Self::default() + } + } + + /// + /// Kernel poll only version of IO_URING, where it is suitable for high traffic environments. + /// This version won't allow aggressive polling on completion queue(CQ). + pub fn kernel_poll_only(queue_len: u32) -> Self { + Self { + queue_len, + sqpoll_wake_interval: Some(2), + aggressive_poll: true, + ..Self::default() + } + } + + /// + /// IOPOLL enabled ring configuration for operating on files with low-latency. + pub fn io_poll(queue_len: u32) -> Self { + Self { + queue_len, + iopoll_enabled: true, + ..Self::default() + } + } +} + impl Default for IoUringConfiguration { fn default() -> Self { Self { @@ -65,6 +136,8 @@ impl Default for IoUringConfiguration { sqpoll_wake_interval: Some(2), per_numa_bounded_worker_count: Some(1 << 8), per_numa_unbounded_worker_count: Some(1 << 9), + aggressive_poll: true, + iopoll_enabled: false, } } } diff --git a/src/proactor.rs b/src/proactor.rs index 25e9424..08f04be 100644 --- a/src/proactor.rs +++ b/src/proactor.rs @@ -1,10 +1,10 @@ -use std::ops::DerefMut; + use std::task::{Context, Poll}; use std::time::Duration; use std::{future::Future, io}; use crate::config::NucleiConfig; -use once_cell::sync::{Lazy, OnceCell}; +use once_cell::sync::{OnceCell}; use super::syscore::*; use super::waker::*; @@ -25,7 +25,7 @@ impl Proactor { /// Returns a reference to the proactor. pub fn get() -> &'static Proactor { unsafe { - &PROACTOR.get_or_init(|| { + PROACTOR.get_or_init(|| { Proactor( SysProactor::new(NucleiConfig::default()) .expect("cannot initialize IO backend"), @@ -37,15 +37,14 @@ impl Proactor { /// Builds a proactor instance with given config and returns a reference to it. pub fn with_config(config: NucleiConfig) -> &'static Proactor { unsafe { - let mut proactor = + let proactor = Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend")); PROACTOR .set(proactor) .map_err(|e| "Proactor instance not being able to set.") .unwrap(); - let proactor = - Proactor(SysProactor::new(config).expect("cannot initialize IO backend")); - &PROACTOR.get_or_init(|| proactor) + + PROACTOR.wait() } } @@ -133,6 +132,7 @@ mod proactor_tests { sqpoll_wake_interval: Some(11), per_numa_bounded_worker_count: Some(12), per_numa_unbounded_worker_count: Some(13), + ..IoUringConfiguration::default() }, }; let new = Proactor::with_config(config); diff --git a/src/syscore/linux/iouring/fs/buffer.rs b/src/syscore/linux/iouring/fs/buffer.rs index f216459..b1a5b90 100644 --- a/src/syscore/linux/iouring/fs/buffer.rs +++ b/src/syscore/linux/iouring/fs/buffer.rs @@ -47,7 +47,7 @@ impl Buffer { unsafe { let data: *mut u8 = self.data.cast().as_ptr(); let cap = self.cap - self.pos; - slice::from_raw_parts(data.offset(self.pos as isize), cap as usize) + slice::from_raw_parts(data.add(self.pos), cap) } } else { &[] @@ -81,7 +81,7 @@ impl Buffer { #[inline(always)] pub fn consume(&mut self, amt: usize) { - self.pos = cmp::min(self.pos + amt as usize, self.cap); + self.pos = cmp::min(self.pos + amt, self.cap); } #[inline(always)] diff --git a/src/syscore/linux/iouring/fs/mod.rs b/src/syscore/linux/iouring/fs/mod.rs index 4fbf5a6..1e31396 100644 --- a/src/syscore/linux/iouring/fs/mod.rs +++ b/src/syscore/linux/iouring/fs/mod.rs @@ -2,6 +2,6 @@ pub(crate) mod buffer; pub(crate) mod cancellation; pub(crate) mod store_file; -pub(crate) use buffer::*; -pub(crate) use cancellation::*; + + pub(crate) use store_file::*; diff --git a/src/syscore/linux/iouring/fs/store_file.rs b/src/syscore/linux/iouring/fs/store_file.rs index 2009020..face461 100644 --- a/src/syscore/linux/iouring/fs/store_file.rs +++ b/src/syscore/linux/iouring/fs/store_file.rs @@ -1,16 +1,16 @@ -use crate::Handle; -use lever::sync::prelude::TTas; + + use std::fs::File; use std::io; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::os::unix::io::{FromRawFd, RawFd}; use std::pin::Pin; use std::sync::Arc; use super::buffer::Buffer; use crate::syscore::Processor; use lever::sync::atomics::AtomicBox; -use pin_utils::unsafe_pinned; -use std::task::{Context, Poll}; + + pub struct StoreFile { fd: RawFd, @@ -87,7 +87,7 @@ impl StoreFile { &mut self.pos } - pub(crate) fn guard_op(self: &mut Self, op: Op) { + pub(crate) fn guard_op(&mut self, op: Op) { // let this = unsafe { Pin::get_unchecked_mut(self) }; // if *self.op_state.get() != Op::Pending && *self.op_state.get() != op { // self.cancel(); diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index c243995..b0ec1f6 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -1,14 +1,14 @@ use ahash::{HashMap, HashMapExt}; use core::mem::MaybeUninit; -use futures::channel::oneshot; + use lever::sync::prelude::*; use pin_utils::unsafe_pinned; use std::future::Future; -use std::hash::BuildHasherDefault; + use std::io; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::os::unix::io::{AsRawFd}; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::time::Duration; @@ -28,7 +28,7 @@ macro_rules! syscall { use crate::config::NucleiConfig; use crossbeam_channel::{unbounded, Receiver, Sender}; -use rustix::io_uring::IoringOp; + use rustix_uring::cqueue::{more, sock_nonempty}; use rustix_uring::{ cqueue::Entry as CQEntry, squeue::Entry as SQEntry, CompletionQueue, IoUring, SubmissionQueue, @@ -137,15 +137,16 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result } /////////////////// -//// uring impl +/// uring impl /////////////////// pub struct SysProactor { pub(crate) sq: TTas>, pub(crate) cq: TTas>, - sbmt: TTas>, + sbmt: Submitter<'static>, submitters: TTas>>, submitter_id: AtomicU64, + aggressive_poll: bool, } pub type RingTypes = ( @@ -164,37 +165,40 @@ impl SysProactor { .iouring .sqpoll_wake_interval .map(|e| rb.setup_sqpoll(e)); - let mut ring = rb + if config.iouring.iopoll_enabled { + rb.setup_iopoll(); + } + let ring = rb .build(config.iouring.queue_len) .expect("nuclei: uring can't be initialized"); IO_URING = Some(ring); - let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split(); + let (sbmt, sq, cq) = IO_URING.as_mut().unwrap().split(); match ( config.iouring.per_numa_bounded_worker_count, config.iouring.per_numa_unbounded_worker_count, ) { - (Some(bw), Some(ubw)) => submitter.register_iowq_max_workers(&mut [bw, ubw])?, - (None, Some(ubw)) => submitter.register_iowq_max_workers(&mut [0, ubw])?, - (Some(bw), None) => submitter.register_iowq_max_workers(&mut [bw, 0])?, - (None, None) => submitter.register_iowq_max_workers(&mut [0, 0])?, + (Some(bw), Some(ubw)) => sbmt.register_iowq_max_workers(&mut [bw, ubw])?, + (None, Some(ubw)) => sbmt.register_iowq_max_workers(&mut [0, ubw])?, + (Some(bw), None) => sbmt.register_iowq_max_workers(&mut [bw, 0])?, + (None, None) => sbmt.register_iowq_max_workers(&mut [0, 0])?, } Ok(SysProactor { sq: TTas::new(sq), cq: TTas::new(cq), - sbmt: TTas::new(submitter), + sbmt, submitters: TTas::new(HashMap::with_capacity(config.iouring.queue_len as usize)), submitter_id: AtomicU64::default(), + aggressive_poll: config.iouring.aggressive_poll, }) } } pub(crate) fn register_files_sparse(&self, n: u32) -> io::Result<()> { - let mut sbmt = self.sbmt.lock(); - Ok(sbmt.register_files_sparse(n)?) + Ok(self.sbmt.register_files_sparse(n)?) } pub(crate) fn register_io(&self, mut sqe: SQEntry) -> io::Result { @@ -214,9 +218,7 @@ impl SysProactor { sq.sync(); drop(sq); - let sbmt = self.sbmt.lock(); - sbmt.submit()?; - drop(sbmt); + self.sbmt.submit()?; Ok(CompletionChan { rx }) } @@ -235,8 +237,11 @@ impl SysProactor { // issue cas barrier 'sock: loop { + if !self.aggressive_poll { + self.sbmt.submit_and_wait(1)?; + } cq.sync(); - while let Some(cqe) = cq.next() { + for cqe in cq.by_ref() { if more(cqe.flags()) { self.cqe_completion_multi(&cqe)?; } else { @@ -257,7 +262,7 @@ impl SysProactor { let udata = cqe.user_data(); let res: i32 = cqe.result(); - let mut sbmts = self.submitters.lock(); + let sbmts = self.submitters.lock(); sbmts.get(&udata).map(|s| s.send(res)); // if atomics are going to be wrapped, channel will be reinserted. // which is ok. diff --git a/src/syscore/linux/iouring/mod.rs b/src/syscore/linux/iouring/mod.rs index 03e2ab1..5d72821 100644 --- a/src/syscore/linux/iouring/mod.rs +++ b/src/syscore/linux/iouring/mod.rs @@ -6,8 +6,8 @@ mod processor; pub(crate) use fs::*; pub(crate) use iouring::*; -pub(crate) use net::*; -pub(crate) use nethandle::*; + + pub(crate) use processor::*; pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::IoUring; diff --git a/src/syscore/linux/iouring/net/multishot.rs b/src/syscore/linux/iouring/net/multishot.rs index 91cc3ad..3aad50d 100644 --- a/src/syscore/linux/iouring/net/multishot.rs +++ b/src/syscore/linux/iouring/net/multishot.rs @@ -3,7 +3,7 @@ use crate::{Handle, Proactor}; use crossbeam_channel::RecvError; use futures::Stream; use pin_project_lite::pin_project; -use rustix::io_uring::{IoringOp, SocketFlags}; +use rustix::io_uring::{SocketFlags}; use rustix_uring::{opcode as OP, types::Fd}; use std::io; use std::net::TcpStream; @@ -23,7 +23,7 @@ pin_project! { impl TcpStreamGenerator { pub fn new(listener: &T) -> io::Result { - let mut sqe = OP::AcceptMulti::new(Fd(listener.as_raw_fd())) + let sqe = OP::AcceptMulti::new(Fd(listener.as_raw_fd())) .flags(SocketFlags::NONBLOCK) .build(); diff --git a/src/syscore/linux/iouring/nethandle.rs b/src/syscore/linux/iouring/nethandle.rs index cf7e154..8c2fa4c 100644 --- a/src/syscore/linux/iouring/nethandle.rs +++ b/src/syscore/linux/iouring/nethandle.rs @@ -1,4 +1,4 @@ -use std::future::Future; + use std::io; use std::marker::Unpin; use std::net::{SocketAddr, ToSocketAddrs}; @@ -16,9 +16,9 @@ use lever::sync::prelude::*; use super::Processor; use crate::syscore::linux::iouring::fs::store_file::StoreFile; use crate::syscore::linux::iouring::net::multishot::TcpStreamGenerator; -use crate::syscore::CompletionChan; -use crate::{Handle, Proactor}; -use std::fs::File; + +use crate::{Handle}; + impl Handle { pub fn new(io: T) -> io::Result> { @@ -41,7 +41,7 @@ impl Handle { let listener = TcpListener::bind(addr)?; listener.set_nonblocking(true)?; - Ok(Handle::new(listener)?) + Handle::new(listener) } /// @@ -71,7 +71,7 @@ impl Handle { impl Handle { pub async fn connect(sock_addrs: A) -> io::Result> { - Ok(Processor::processor_connect(sock_addrs, Processor::processor_connect_tcp).await?) + Processor::processor_connect(sock_addrs, Processor::processor_connect_tcp).await } pub async fn send(&self, buf: &[u8]) -> io::Result { @@ -89,7 +89,7 @@ impl Handle { impl Handle { pub fn bind(addr: A) -> io::Result> { - Ok(Handle::new(UdpSocket::bind(addr)?)?) + Handle::new(UdpSocket::bind(addr)?) } pub async fn connect(sock_addrs: A) -> io::Result> { @@ -129,7 +129,7 @@ impl Handle { impl Handle { pub fn bind>(path: P) -> io::Result> { - Ok(Handle::new(UnixListener::bind(path)?)?) + Handle::new(UnixListener::bind(path)?) } pub async fn accept(&self) -> io::Result<(Handle, UnixSocketAddr)> { @@ -149,7 +149,7 @@ impl Handle { impl Handle { pub async fn connect>(path: P) -> io::Result> { - Ok(Processor::processor_connect_unix(path).await?) + Processor::processor_connect_unix(path).await } pub fn pair() -> io::Result<(Handle, Handle)> { @@ -177,7 +177,7 @@ impl Handle { impl Handle { pub fn bind>(path: P) -> io::Result> { - Ok(Handle::new(UnixDatagram::bind(path)?)?) + Handle::new(UnixDatagram::bind(path)?) } pub fn pair() -> io::Result<(Handle, Handle)> { diff --git a/src/syscore/linux/iouring/processor.rs b/src/syscore/linux/iouring/processor.rs index d49510d..93d3c4b 100644 --- a/src/syscore/linux/iouring/processor.rs +++ b/src/syscore/linux/iouring/processor.rs @@ -1,16 +1,14 @@ use std::future::Future; -use std::io::{Read, Write}; + use std::net::TcpStream; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; -use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; -use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixStream}; use std::path::Path; use std::{ - fs::File, - mem::ManuallyDrop, os::unix::io::{AsRawFd, FromRawFd}, }; -use std::{io, mem}; +use std::{io}; use crate::proactor::Proactor; @@ -18,14 +16,14 @@ use crate::syscore::shim_to_af_unix; use crate::Handle; use libc::sockaddr_un; use os_socketaddr::OsSocketAddr; -use pin_utils::unsafe_pinned; + use rustix::io_uring::{ - msghdr, sockaddr, sockaddr_storage, IoringRecvFlags, RecvFlags, SendFlags, SocketFlags, + msghdr, IoringRecvFlags, RecvFlags, SendFlags, SocketFlags, }; -use rustix::net::{connect_unix, SocketAddrAny, SocketAddrStorage, SocketAddrUnix}; -use rustix_uring::opcode::RecvMsg; -use rustix_uring::squeue::Entry; -use rustix_uring::types::{socklen_t, AtFlags, Fixed, Mode, OFlags, Statx, StatxFlags}; +use rustix::net::{SocketAddrAny, SocketAddrUnix}; + + +use rustix_uring::types::{socklen_t, AtFlags, Mode, OFlags, Statx, StatxFlags}; use rustix_uring::{opcode as OP, types::Fd}; use socket2::SockAddr; use std::ffi::CString; @@ -58,7 +56,7 @@ impl Processor { let path = CString::new(path.as_ref().as_os_str().as_bytes()).expect("invalid path"); let path = path.as_ptr(); let dfd = libc::AT_FDCWD; - let mut sqe = OP::OpenAt::new(Fd(dfd.as_raw_fd()), path) + let sqe = OP::OpenAt::new(Fd(dfd.as_raw_fd()), path) .flags(OFlags::CLOEXEC | OFlags::RDONLY) .mode(Mode::from(0o666)) .build(); @@ -75,7 +73,7 @@ impl Processor { buf: &mut [u8], offset: usize, ) -> io::Result { - let mut sqe = OP::Read::new(Fd(*io), buf.as_mut_ptr(), buf.len() as _) + let sqe = OP::Read::new(Fd(*io), buf.as_mut_ptr(), buf.len() as _) .offset(offset as _) .build(); @@ -89,7 +87,7 @@ impl Processor { buf: &[u8], offset: usize, ) -> io::Result { - let mut sqe = OP::Write::new(Fd(*io), buf.as_ptr(), buf.len() as _) + let sqe = OP::Write::new(Fd(*io), buf.as_ptr(), buf.len() as _) .offset(offset as _) .build(); @@ -99,7 +97,7 @@ impl Processor { } pub(crate) async fn processor_close_file(io: &RawFd) -> io::Result { - let mut sqe = OP::Close::new(Fd(*io)).build(); + let sqe = OP::Close::new(Fd(*io)).build(); let cc = Proactor::get().inner().register_io(sqe)?; @@ -111,7 +109,7 @@ impl Processor { let flags = libc::AT_EMPTY_PATH; let mask = libc::STATX_SIZE; - let mut sqe = OP::Statx::new(Fd(*io), &EMPTY, statx) + let sqe = OP::Statx::new(Fd(*io), &EMPTY, statx) .flags(AtFlags::EMPTY_PATH) .mask(StatxFlags::SIZE) .build(); @@ -125,7 +123,7 @@ impl Processor { io: &RawFd, bufs: &mut [IoSliceMut<'_>], ) -> io::Result { - let mut sqe = OP::Readv::new(Fd(*io), bufs as *mut _ as *mut _, bufs.len() as _) + let sqe = OP::Readv::new(Fd(*io), bufs as *mut _ as *mut _, bufs.len() as _) .offset(0_u64) .build(); @@ -138,7 +136,7 @@ impl Processor { io: &RawFd, bufs: &[IoSlice<'_>], ) -> io::Result { - let mut sqe = OP::Writev::new(Fd(*io), bufs as *const _ as *const _, bufs.len() as _) + let sqe = OP::Writev::new(Fd(*io), bufs as *const _ as *const _, bufs.len() as _) .offset(0_u64) .build(); @@ -155,7 +153,7 @@ impl Processor { pub(crate) async fn processor_send(socket: &R, buf: &[u8]) -> io::Result { let fd = socket.as_raw_fd() as _; - let mut sqe = OP::Send::new(Fd(fd), buf.as_ptr() as _, buf.len() as _) + let sqe = OP::Send::new(Fd(fd), buf.as_ptr() as _, buf.len() as _) .flags(SendFlags::empty()) .build(); @@ -179,7 +177,7 @@ impl Processor { ) -> io::Result { let fd = socket.as_raw_fd() as _; - let mut sqe = OP::Recv::new(Fd(fd), buf.as_mut_ptr(), buf.len() as _) + let sqe = OP::Recv::new(Fd(fd), buf.as_mut_ptr(), buf.len() as _) .flags(flags) .build(); @@ -243,14 +241,14 @@ impl Processor { let nixsaddr = SockAddr::from(addr); - let mut stream = sock.into_tcp_stream(); + let stream = sock.into_tcp_stream(); stream.set_nodelay(true)?; let fd = stream.as_raw_fd() as _; - let mut ossa: OsSocketAddr = addr.into(); + let ossa: OsSocketAddr = addr.into(); let socklen = ossa.len(); - let mut sqe = OP::Connect::new( + let sqe = OP::Connect::new( Fd(fd), unsafe { std::mem::transmute(ossa.as_ptr()) }, socklen, @@ -259,7 +257,7 @@ impl Processor { Proactor::get().inner().register_io(sqe)?.await?; - Ok(Handle::new(stream)?) + Handle::new(stream) } pub(crate) async fn processor_connect_udp(addr: SocketAddr) -> io::Result> { @@ -293,7 +291,7 @@ impl Processor { sock.connect(&sockaddr)?; // Make into udp type and init handler. - Ok(Handle::new(sock.into_udp_socket())?) + Handle::new(sock.into_udp_socket()) } /////////////////////////////////// @@ -305,7 +303,7 @@ impl Processor { ) -> io::Result<(Handle, Option)> { let fd = listener.as_raw_fd() as _; - let mut sqe = OP::Accept::new(Fd(fd), null_mut(), null_mut()) + let sqe = OP::Accept::new(Fd(fd), null_mut(), null_mut()) .flags(SocketFlags::NONBLOCK) .build(); @@ -333,7 +331,7 @@ impl Processor { addr: &socket2::SockAddr, ) -> io::Result { // FIXME: (vcq): Wrap into vec? - let mut iov = IoSlice::new(buf); + let iov = IoSlice::new(buf); let mut sendmsg = unsafe { MaybeUninit::::zeroed().assume_init() }; sendmsg.msg_name = addr.as_ptr() as *mut _; @@ -343,7 +341,7 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let mut sqe = OP::SendMsg::new(Fd(fd), &sendmsg as *const _ as *const _) + let sqe = OP::SendMsg::new(Fd(fd), &sendmsg as *const _ as *const _) .flags(SendFlags::empty()) .build(); @@ -379,7 +377,7 @@ impl Processor { unsafe { MaybeUninit::::zeroed().assume_init() }; // FIXME: (vcq): Wrap into vec? - let mut iov = IoSliceMut::new(buf); + let iov = IoSliceMut::new(buf); let mut recvmsg = unsafe { MaybeUninit::::zeroed().assume_init() }; recvmsg.msg_name = &mut sockaddr_raw as *mut _ as _; @@ -389,7 +387,7 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let mut sqe = OP::RecvMsg::new(Fd(fd), &mut recvmsg as *mut _ as *mut _) + let sqe = OP::RecvMsg::new(Fd(fd), &mut recvmsg as *mut _ as *mut _) .flags(flags) .ioprio((IoringRecvFlags::POLL_FIRST | IoringRecvFlags::MULTISHOT).bits()) .build(); @@ -421,7 +419,7 @@ impl Processor { }; let mut natsockaddr: ShimSocketAddrUnix = unsafe { std::mem::transmute(sockaddr) }; - let mut sqe = OP::Accept::new( + let sqe = OP::Accept::new( Fd(fd), &mut natsockaddr.unix as *mut _ as *mut _, natsockaddr.len as _, @@ -453,19 +451,19 @@ impl Processor { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; // let sockaddr = socket2::SockAddr::unix(path)?; let sockaddr = SocketAddrUnix::new(path.as_ref())?; - let mut sockaddr: ShimSocketAddrUnix = unsafe { std::mem::transmute(sockaddr) }; + let sockaddr: ShimSocketAddrUnix = unsafe { std::mem::transmute(sockaddr) }; sock.set_nonblocking(true)?; let stream: UnixStream = sock.into_unix_stream(); let fd = stream.as_raw_fd() as _; - let mut sqe = + let sqe = OP::Connect::new(Fd(fd), &sockaddr.unix as *const _ as *const _, sockaddr.len).build(); Proactor::get().inner().register_io(sqe)?.await?; - Ok(Handle::new(stream)?) + Handle::new(stream) } pub(crate) async fn processor_send_to_unix>( From cba5e1f9779ad7f8d7a382783b25f45a43e60460 Mon Sep 17 00:00:00 2001 From: Theo Bulut Date: Fri, 26 Jan 2024 22:03:48 +0100 Subject: [PATCH 2/3] Reorganize docs --- src/syscore/linux/iouring/net/multishot.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/syscore/linux/iouring/net/multishot.rs b/src/syscore/linux/iouring/net/multishot.rs index 3aad50d..550bc26 100644 --- a/src/syscore/linux/iouring/net/multishot.rs +++ b/src/syscore/linux/iouring/net/multishot.rs @@ -11,9 +11,9 @@ use std::os::fd::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; -/// -/// TcpStream generator that is fed by multishot accept with multiple CQEs. pin_project! { + /// + /// TcpStream generator that is fed by multishot accept with multiple CQEs. #[derive(Clone)] pub struct TcpStreamGenerator { listener: RawFd, From 4ec18ee5fb3834582afd4a1abf2847fa3503ec43 Mon Sep 17 00:00:00 2001 From: Theo Bulut Date: Fri, 26 Jan 2024 22:04:07 +0100 Subject: [PATCH 3/3] Cargo fmt again --- src/proactor.rs | 3 +-- src/syscore/linux/iouring/fs/mod.rs | 2 -- src/syscore/linux/iouring/fs/store_file.rs | 4 ---- src/syscore/linux/iouring/iouring.rs | 2 +- src/syscore/linux/iouring/mod.rs | 1 - src/syscore/linux/iouring/net/multishot.rs | 2 +- src/syscore/linux/iouring/nethandle.rs | 4 +--- src/syscore/linux/iouring/processor.rs | 11 +++-------- 8 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/proactor.rs b/src/proactor.rs index 08f04be..3283291 100644 --- a/src/proactor.rs +++ b/src/proactor.rs @@ -1,10 +1,9 @@ - use std::task::{Context, Poll}; use std::time::Duration; use std::{future::Future, io}; use crate::config::NucleiConfig; -use once_cell::sync::{OnceCell}; +use once_cell::sync::OnceCell; use super::syscore::*; use super::waker::*; diff --git a/src/syscore/linux/iouring/fs/mod.rs b/src/syscore/linux/iouring/fs/mod.rs index 1e31396..c08a196 100644 --- a/src/syscore/linux/iouring/fs/mod.rs +++ b/src/syscore/linux/iouring/fs/mod.rs @@ -2,6 +2,4 @@ pub(crate) mod buffer; pub(crate) mod cancellation; pub(crate) mod store_file; - - pub(crate) use store_file::*; diff --git a/src/syscore/linux/iouring/fs/store_file.rs b/src/syscore/linux/iouring/fs/store_file.rs index face461..c80261f 100644 --- a/src/syscore/linux/iouring/fs/store_file.rs +++ b/src/syscore/linux/iouring/fs/store_file.rs @@ -1,5 +1,3 @@ - - use std::fs::File; use std::io; use std::os::unix::io::{FromRawFd, RawFd}; @@ -10,8 +8,6 @@ use super::buffer::Buffer; use crate::syscore::Processor; use lever::sync::atomics::AtomicBox; - - pub struct StoreFile { fd: RawFd, buf: Buffer, diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index b0ec1f6..d4f084a 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -6,7 +6,7 @@ use pin_utils::unsafe_pinned; use std::future::Future; use std::io; -use std::os::unix::io::{AsRawFd}; +use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; diff --git a/src/syscore/linux/iouring/mod.rs b/src/syscore/linux/iouring/mod.rs index 5d72821..e07a8a8 100644 --- a/src/syscore/linux/iouring/mod.rs +++ b/src/syscore/linux/iouring/mod.rs @@ -7,7 +7,6 @@ mod processor; pub(crate) use fs::*; pub(crate) use iouring::*; - pub(crate) use processor::*; pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::IoUring; diff --git a/src/syscore/linux/iouring/net/multishot.rs b/src/syscore/linux/iouring/net/multishot.rs index 550bc26..0f1be52 100644 --- a/src/syscore/linux/iouring/net/multishot.rs +++ b/src/syscore/linux/iouring/net/multishot.rs @@ -3,7 +3,7 @@ use crate::{Handle, Proactor}; use crossbeam_channel::RecvError; use futures::Stream; use pin_project_lite::pin_project; -use rustix::io_uring::{SocketFlags}; +use rustix::io_uring::SocketFlags; use rustix_uring::{opcode as OP, types::Fd}; use std::io; use std::net::TcpStream; diff --git a/src/syscore/linux/iouring/nethandle.rs b/src/syscore/linux/iouring/nethandle.rs index 8c2fa4c..e2e0d03 100644 --- a/src/syscore/linux/iouring/nethandle.rs +++ b/src/syscore/linux/iouring/nethandle.rs @@ -1,4 +1,3 @@ - use std::io; use std::marker::Unpin; use std::net::{SocketAddr, ToSocketAddrs}; @@ -17,8 +16,7 @@ use super::Processor; use crate::syscore::linux::iouring::fs::store_file::StoreFile; use crate::syscore::linux::iouring::net::multishot::TcpStreamGenerator; -use crate::{Handle}; - +use crate::Handle; impl Handle { pub fn new(io: T) -> io::Result> { diff --git a/src/syscore/linux/iouring/processor.rs b/src/syscore/linux/iouring/processor.rs index 93d3c4b..24399f1 100644 --- a/src/syscore/linux/iouring/processor.rs +++ b/src/syscore/linux/iouring/processor.rs @@ -1,14 +1,12 @@ use std::future::Future; +use std::io; use std::net::TcpStream; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; use std::net::{SocketAddr, ToSocketAddrs}; +use std::os::unix::io::{AsRawFd, FromRawFd}; use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixStream}; use std::path::Path; -use std::{ - os::unix::io::{AsRawFd, FromRawFd}, -}; -use std::{io}; use crate::proactor::Proactor; @@ -17,12 +15,9 @@ use crate::Handle; use libc::sockaddr_un; use os_socketaddr::OsSocketAddr; -use rustix::io_uring::{ - msghdr, IoringRecvFlags, RecvFlags, SendFlags, SocketFlags, -}; +use rustix::io_uring::{msghdr, IoringRecvFlags, RecvFlags, SendFlags, SocketFlags}; use rustix::net::{SocketAddrAny, SocketAddrUnix}; - use rustix_uring::types::{socklen_t, AtFlags, Mode, OFlags, Statx, StatxFlags}; use rustix_uring::{opcode as OP, types::Fd}; use socket2::SockAddr;