diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 7835cf4f764e..cb58d44168fc 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -158,6 +158,8 @@ struct SSConfig { udp_timeout: Option, #[serde(skip_serializing_if = "Option::is_none")] udp_max_associations: Option, + #[serde(skip_serializing_if = "Option::is_none")] + udp_mtu: Option, #[serde(skip_serializing_if = "Option::is_none", alias = "shadowsocks")] servers: Option>, @@ -1228,6 +1230,10 @@ pub struct Config { pub udp_timeout: Option, /// Maximum number of UDP Associations, default is unconfigured pub udp_max_associations: Option, + /// Maximum Transmission Unit (MTU) size for UDP packets + /// 65535 by default. Suggestion: 1500 + /// NOTE: mtu includes IP header, UDP header, UDP payload + pub udp_mtu: Option, /// ACL configuration (Global) /// @@ -1353,6 +1359,7 @@ impl Config { udp_timeout: None, udp_max_associations: None, + udp_mtu: None, acl: None, @@ -2056,6 +2063,9 @@ impl Config { // Maximum associations to be kept simultaneously nconfig.udp_max_associations = config.udp_max_associations; + // MTU for UDP + nconfig.udp_mtu = config.udp_mtu; + // RLIMIT_NOFILE #[cfg(all(unix, not(target_os = "android")))] { @@ -2764,6 +2774,8 @@ impl fmt::Display for Config { jconf.udp_max_associations = self.udp_max_associations; + jconf.udp_mtu = self.udp_mtu; + #[cfg(all(unix, not(target_os = "android")))] { jconf.nofile = self.nofile; diff --git a/crates/shadowsocks-service/src/local/http/http_stream.rs b/crates/shadowsocks-service/src/local/http/http_stream.rs index e0616bf3a7bf..ef2c2d448ccd 100644 --- a/crates/shadowsocks-service/src/local/http/http_stream.rs +++ b/crates/shadowsocks-service/src/local/http/http_stream.rs @@ -64,8 +64,7 @@ impl ProxyHttpStream { use once_cell::sync::Lazy; use std::sync::Arc; use tokio_rustls::{ - rustls::pki_types::ServerName, - rustls::{ClientConfig, RootCertStore}, + rustls::{pki_types::ServerName, ClientConfig, RootCertStore}, TlsConnector, }; diff --git a/crates/shadowsocks-service/src/local/http/server.rs b/crates/shadowsocks-service/src/local/http/server.rs index c3919b04e9b3..4e7b6a65d8f6 100644 --- a/crates/shadowsocks-service/src/local/http/server.rs +++ b/crates/shadowsocks-service/src/local/http/server.rs @@ -13,7 +13,9 @@ use tokio::{ }; use crate::local::{ - context::ServiceContext, loadbalancing::PingBalancer, net::tcp::listener::create_standard_tcp_listener, + context::ServiceContext, + loadbalancing::PingBalancer, + net::tcp::listener::create_standard_tcp_listener, }; use super::{http_client::HttpClient, http_service::HttpService, tokio_rt::TokioIo}; diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index f5b5c1e0029a..b274874ade9c 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -151,6 +151,7 @@ impl Server { connect_opts.tcp.fastopen = config.fast_open; connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT)); connect_opts.tcp.mptcp = config.mptcp; + connect_opts.udp.mtu = config.udp_mtu; context.set_connect_opts(connect_opts); let mut accept_opts = AcceptOpts { @@ -163,6 +164,7 @@ impl Server { accept_opts.tcp.fastopen = config.fast_open; accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT)); accept_opts.tcp.mptcp = config.mptcp; + accept_opts.udp.mtu = config.udp_mtu; context.set_accept_opts(accept_opts); if let Some(resolver) = build_dns_resolver( diff --git a/crates/shadowsocks-service/src/local/socks/server/server.rs b/crates/shadowsocks-service/src/local/socks/server/server.rs index 2afd6e327528..83eb4d319ff2 100644 --- a/crates/shadowsocks-service/src/local/socks/server/server.rs +++ b/crates/shadowsocks-service/src/local/socks/server/server.rs @@ -7,7 +7,9 @@ use tokio::{net::TcpStream, time}; #[cfg(feature = "local-http")] use crate::local::http::HttpConnectionHandler; use crate::local::{ - context::ServiceContext, loadbalancing::PingBalancer, net::tcp::listener::create_standard_tcp_listener, + context::ServiceContext, + loadbalancing::PingBalancer, + net::tcp::listener::create_standard_tcp_listener, socks::config::Socks5AuthConfig, }; diff --git a/crates/shadowsocks-service/src/manager/mod.rs b/crates/shadowsocks-service/src/manager/mod.rs index 1aea53824088..94b40dadebb7 100644 --- a/crates/shadowsocks-service/src/manager/mod.rs +++ b/crates/shadowsocks-service/src/manager/mod.rs @@ -52,6 +52,7 @@ pub async fn run(config: Config) -> io::Result<()> { connect_opts.tcp.fastopen = config.fast_open; connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT)); connect_opts.tcp.mptcp = config.mptcp; + connect_opts.udp.mtu = config.udp_mtu; let mut accept_opts = AcceptOpts { ipv6_only: config.ipv6_only, @@ -63,6 +64,7 @@ pub async fn run(config: Config) -> io::Result<()> { accept_opts.tcp.fastopen = config.fast_open; accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT)); accept_opts.tcp.mptcp = config.mptcp; + accept_opts.udp.mtu = config.udp_mtu; if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, config.dns_cache_size, &connect_opts).await diff --git a/crates/shadowsocks-service/src/server/mod.rs b/crates/shadowsocks-service/src/server/mod.rs index 738e4c91d99d..9910b5b26292 100644 --- a/crates/shadowsocks-service/src/server/mod.rs +++ b/crates/shadowsocks-service/src/server/mod.rs @@ -83,6 +83,7 @@ pub async fn run(config: Config) -> io::Result<()> { connect_opts.tcp.fastopen = config.fast_open; connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT)); connect_opts.tcp.mptcp = config.mptcp; + connect_opts.udp.mtu = config.udp_mtu; let mut accept_opts = AcceptOpts { ipv6_only: config.ipv6_only, @@ -94,6 +95,7 @@ pub async fn run(config: Config) -> io::Result<()> { accept_opts.tcp.fastopen = config.fast_open; accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT)); accept_opts.tcp.mptcp = config.mptcp; + accept_opts.udp.mtu = config.udp_mtu; let resolver = build_dns_resolver(config.dns, config.ipv6_first, config.dns_cache_size, &connect_opts) .await diff --git a/crates/shadowsocks/src/net/option.rs b/crates/shadowsocks/src/net/option.rs index 231b98662d36..d73ea9c29182 100644 --- a/crates/shadowsocks/src/net/option.rs +++ b/crates/shadowsocks/src/net/option.rs @@ -30,6 +30,15 @@ pub struct TcpSocketOpts { pub mptcp: bool, } +/// Options for UDP server +#[derive(Debug, Clone, Default)] +pub struct UdpSocketOpts { + /// Maximum Transmission Unit (MTU) for UDP socket `recv` + /// + /// NOTE: MTU includes IP header, UDP header, UDP payload + pub mtu: Option, +} + /// Options for connecting to remote server #[derive(Debug, Clone, Default)] pub struct ConnectOpts { @@ -58,6 +67,9 @@ pub struct ConnectOpts { /// TCP options pub tcp: TcpSocketOpts, + + /// UDP options + pub udp: UdpSocketOpts, } /// Inbound connection options @@ -66,6 +78,9 @@ pub struct AcceptOpts { /// TCP options pub tcp: TcpSocketOpts, + /// UDP options + pub udp: UdpSocketOpts, + /// Enable IPV6_V6ONLY option for socket pub ipv6_only: bool, } diff --git a/crates/shadowsocks/src/net/udp.rs b/crates/shadowsocks/src/net/udp.rs index afce737c7da2..ca751b7b1d55 100644 --- a/crates/shadowsocks/src/net/udp.rs +++ b/crates/shadowsocks/src/net/udp.rs @@ -34,6 +34,7 @@ use pin_project::pin_project; target_os = "freebsd" ))] use tokio::io::Interest; +use tokio::{io::ReadBuf, net::ToSocketAddrs}; use crate::{context::Context, relay::socks5::Address, ServerAddr}; @@ -72,9 +73,21 @@ pub struct BatchRecvMessage<'a> { pub data_len: usize, } +#[inline] +fn make_mtu_error(packet_size: usize, mtu: usize) -> io::Error { + io::Error::new( + io::ErrorKind::Other, + format!("UDP packet {} > MTU {}", packet_size, mtu), + ) +} + /// Wrappers for outbound `UdpSocket` #[pin_project] -pub struct UdpSocket(#[pin] tokio::net::UdpSocket); +pub struct UdpSocket { + #[pin] + socket: tokio::net::UdpSocket, + mtu: Option, +} impl UdpSocket { /// Connects to shadowsocks server @@ -98,7 +111,10 @@ impl UdpSocket { } }; - Ok(UdpSocket(socket)) + Ok(UdpSocket { + socket, + mtu: opts.udp.mtu, + }) } /// Connects to proxy target @@ -122,24 +138,38 @@ impl UdpSocket { } }; - Ok(UdpSocket(socket)) + Ok(UdpSocket { + socket, + mtu: opts.udp.mtu, + }) } /// Connects to shadowsocks server pub async fn connect_with_opts(addr: &SocketAddr, opts: &ConnectOpts) -> io::Result { let socket = create_outbound_udp_socket(From::from(addr), opts).await?; socket.connect(addr).await?; - Ok(UdpSocket(socket)) + Ok(UdpSocket { + socket, + mtu: opts.udp.mtu, + }) } /// Binds to a specific address with opts pub async fn connect_any_with_opts>(af: AF, opts: &ConnectOpts) -> io::Result { - create_outbound_udp_socket(af.into(), opts).await.map(UdpSocket) + create_outbound_udp_socket(af.into(), opts) + .await + .map(|socket| UdpSocket { + socket, + mtu: opts.udp.mtu, + }) } /// Binds to a specific address with opts as an outbound socket pub async fn bind_with_opts(addr: &SocketAddr, opts: &ConnectOpts) -> io::Result { - bind_outbound_udp_socket(addr, opts).await.map(UdpSocket) + bind_outbound_udp_socket(addr, opts).await.map(|socket| UdpSocket { + socket, + mtu: opts.udp.mtu, + }) } /// Binds to a specific address (inbound) @@ -151,7 +181,116 @@ impl UdpSocket { /// Binds to a specific address (inbound) pub async fn listen_with_opts(addr: &SocketAddr, opts: AcceptOpts) -> io::Result { let socket = create_inbound_udp_socket(addr, opts.ipv6_only).await?; - Ok(UdpSocket(socket)) + Ok(UdpSocket { + socket, + mtu: opts.udp.mtu, + }) + } + + /// Wrapper of `UdpSocket::poll_send` + pub fn poll_send(&self, cx: &mut TaskContext<'_>, buf: &[u8]) -> Poll> { + // Check MTU + if let Some(mtu) = self.mtu { + if buf.len() > mtu { + return Err(make_mtu_error(buf.len(), mtu)).into(); + } + } + + self.socket.poll_send(cx, buf) + } + + /// Wrapper of `UdpSocket::send` + #[inline] + pub async fn send(&self, buf: &[u8]) -> io::Result { + // Check MTU + if let Some(mtu) = self.mtu { + if buf.len() > mtu { + return Err(make_mtu_error(buf.len(), mtu)).into(); + } + } + + self.socket.send(buf).await + } + + /// Wrapper of `UdpSocket::poll_send_to` + pub fn poll_send_to(&self, cx: &mut TaskContext<'_>, buf: &[u8], target: SocketAddr) -> Poll> { + // Check MTU + if let Some(mtu) = self.mtu { + if buf.len() > mtu { + return Err(make_mtu_error(buf.len(), mtu)).into(); + } + } + + self.socket.poll_send_to(cx, buf, target) + } + + /// Wrapper of `UdpSocket::send_to` + #[inline] + pub async fn send_to(&self, buf: &[u8], target: A) -> io::Result { + // Check MTU + if let Some(mtu) = self.mtu { + if buf.len() > mtu { + return Err(make_mtu_error(buf.len(), mtu)).into(); + } + } + + self.socket.send_to(buf, target).await + } + + /// Wrapper of `UdpSocket::poll_recv` + #[inline] + pub fn poll_recv(&self, cx: &mut TaskContext<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + ready!(self.socket.poll_recv(cx, buf))?; + + if let Some(mtu) = self.mtu { + if buf.filled().len() > mtu { + return Err(make_mtu_error(buf.filled().len(), mtu)).into(); + } + } + + Ok(()).into() + } + + /// Wrapper of `UdpSocket::recv` + #[inline] + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + let n = self.socket.recv(buf).await?; + + if let Some(mtu) = self.mtu { + if n > mtu { + return Err(make_mtu_error(n, mtu)); + } + } + + Ok(n) + } + + /// Wrapper of `UdpSocket::poll_recv_from` + #[inline] + pub fn poll_recv_from(&self, cx: &mut TaskContext<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let addr = ready!(self.socket.poll_recv_from(cx, buf))?; + + if let Some(mtu) = self.mtu { + if buf.filled().len() > mtu { + return Err(make_mtu_error(buf.filled().len(), mtu)).into(); + } + } + + Ok(addr).into() + } + + /// Wrapper of `UdpSocket::recv` + #[inline] + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + let (n, addr) = self.socket.recv_from(buf).await?; + + if let Some(mtu) = self.mtu { + if n > mtu { + return Err(make_mtu_error(n, mtu)); + } + } + + Ok((n, addr)) } /// Batch send packets @@ -170,9 +309,12 @@ impl UdpSocket { use super::sys::batch_sendmsg; loop { - ready!(self.0.poll_send_ready(cx))?; + ready!(self.socket.poll_send_ready(cx))?; - match self.0.try_io(Interest::WRITABLE, || batch_sendmsg(&self.0, msgs)) { + match self + .socket + .try_io(Interest::WRITABLE, || batch_sendmsg(&self.socket, msgs)) + { Ok(n) => return Ok(n).into(), Err(ref err) if err.kind() == ErrorKind::WouldBlock => {} Err(err) => return Err(err).into(), @@ -208,9 +350,12 @@ impl UdpSocket { use super::sys::batch_recvmsg; loop { - ready!(self.0.poll_recv_ready(cx))?; + ready!(self.socket.poll_recv_ready(cx))?; - match self.0.try_io(Interest::READABLE, || batch_recvmsg(&self.0, msgs)) { + match self + .socket + .try_io(Interest::READABLE, || batch_recvmsg(&self.socket, msgs)) + { Ok(n) => return Ok(n).into(), Err(ref err) if err.kind() == ErrorKind::WouldBlock => {} Err(err) => return Err(err).into(), @@ -235,24 +380,24 @@ impl Deref for UdpSocket { type Target = tokio::net::UdpSocket; fn deref(&self) -> &Self::Target { - &self.0 + &self.socket } } impl DerefMut for UdpSocket { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.socket } } impl From for UdpSocket { - fn from(s: tokio::net::UdpSocket) -> Self { - UdpSocket(s) + fn from(socket: tokio::net::UdpSocket) -> Self { + UdpSocket { socket, mtu: None } } } impl From for tokio::net::UdpSocket { fn from(s: UdpSocket) -> tokio::net::UdpSocket { - s.0 + s.socket } }