Skip to content

Commit

Permalink
feat: tun smoltcp thread given a name
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Feb 16, 2024
1 parent da9927c commit cc619c4
Showing 1 changed file with 143 additions and 140 deletions.
283 changes: 143 additions & 140 deletions crates/shadowsocks-service/src/local/tun/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,181 +295,184 @@ impl TcpTun {
let manager_handle = {
let manager_running = manager_running.clone();

thread::spawn(move || {
let TcpSocketManager {
ref mut device,
ref mut iface,
ref mut sockets,
ref mut socket_creation_rx,
..
} = manager;

let mut socket_set = SocketSet::new(vec![]);

while manager_running.load(Ordering::Relaxed) {
while let Ok(TcpSocketCreation {
control,
socket,
socket_created_tx: socket_create_tx,
}) = socket_creation_rx.try_recv()
{
let handle = socket_set.add(socket);
let _ = socket_create_tx.send(());
sockets.insert(handle, control);
}
thread::Builder::new()
.name("smoltcp-poll".to_owned())
.spawn(move || {
let TcpSocketManager {
ref mut device,
ref mut iface,
ref mut sockets,
ref mut socket_creation_rx,
..
} = manager;

let mut socket_set = SocketSet::new(vec![]);

while manager_running.load(Ordering::Relaxed) {
while let Ok(TcpSocketCreation {
control,
socket,
socket_created_tx: socket_create_tx,
}) = socket_creation_rx.try_recv()
{
let handle = socket_set.add(socket);
let _ = socket_create_tx.send(());
sockets.insert(handle, control);
}

let before_poll = SmolInstant::now();
let updated_sockets = iface.poll(before_poll, device, &mut socket_set);
let before_poll = SmolInstant::now();
let updated_sockets = iface.poll(before_poll, device, &mut socket_set);

if updated_sockets {
trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll);
}
if updated_sockets {
trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll);
}

// Check all the sockets' status
let mut sockets_to_remove = Vec::new();
// Check all the sockets' status
let mut sockets_to_remove = Vec::new();

for (socket_handle, control) in sockets.iter() {
let socket_handle = *socket_handle;
let socket = socket_set.get_mut::<TcpSocket>(socket_handle);
let mut control = control.lock();
for (socket_handle, control) in sockets.iter() {
let socket_handle = *socket_handle;
let socket = socket_set.get_mut::<TcpSocket>(socket_handle);
let mut control = control.lock();

// Remove the socket only when it is in the closed state.
if socket.state() == TcpState::Closed {
sockets_to_remove.push(socket_handle);
// Remove the socket only when it is in the closed state.
if socket.state() == TcpState::Closed {
sockets_to_remove.push(socket_handle);

control.send_state = TcpSocketState::Closed;
control.recv_state = TcpSocketState::Closed;
control.send_state = TcpSocketState::Closed;
control.recv_state = TcpSocketState::Closed;

if let Some(waker) = control.send_waker.take() {
waker.wake();
}
if let Some(waker) = control.recv_waker.take() {
waker.wake();
if let Some(waker) = control.send_waker.take() {
waker.wake();
}
if let Some(waker) = control.recv_waker.take() {
waker.wake();
}

trace!("closed TCP connection");
continue;
}

trace!("closed TCP connection");
continue;
}
// SHUT_WR
if matches!(control.send_state, TcpSocketState::Close) {
trace!("closing TCP Write Half, {:?}", socket.state());

// SHUT_WR
if matches!(control.send_state, TcpSocketState::Close) {
trace!("closing TCP Write Half, {:?}", socket.state());
// Close the socket. Set to FIN state
socket.close();
control.send_state = TcpSocketState::Closing;

// Close the socket. Set to FIN state
socket.close();
control.send_state = TcpSocketState::Closing;
// We can still process the pending buffer.
}

// We can still process the pending buffer.
}
// Check if readable
let mut wake_receiver = false;
while socket.can_recv() && !control.recv_buffer.is_full() {
let result = socket.recv(|buffer| {
let n = control.recv_buffer.enqueue_slice(buffer);
(n, ())
});

match result {
Ok(..) => {
wake_receiver = true;
}
Err(err) => {
error!("socket recv error: {:?}, {:?}", err, socket.state());

// Check if readable
let mut wake_receiver = false;
while socket.can_recv() && !control.recv_buffer.is_full() {
let result = socket.recv(|buffer| {
let n = control.recv_buffer.enqueue_slice(buffer);
(n, ())
});

match result {
Ok(..) => {
wake_receiver = true;
}
Err(err) => {
error!("socket recv error: {:?}, {:?}", err, socket.state());
// Don't know why. Abort the connection.
socket.abort();

// Don't know why. Abort the connection.
socket.abort();
if matches!(control.recv_state, TcpSocketState::Normal) {
control.recv_state = TcpSocketState::Closed;
}
wake_receiver = true;

if matches!(control.recv_state, TcpSocketState::Normal) {
control.recv_state = TcpSocketState::Closed;
// The socket will be recycled in the next poll.
break;
}
wake_receiver = true;

// The socket will be recycled in the next poll.
break;
}
}
}

// If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2,
// the local client have closed our receiver.
if matches!(control.recv_state, TcpSocketState::Normal)
&& !socket.may_recv()
&& !matches!(
socket.state(),
TcpState::Listen
| TcpState::SynReceived
| TcpState::Established
| TcpState::FinWait1
| TcpState::FinWait2
)
{
trace!("closed TCP Read Half, {:?}", socket.state());

// Let TcpConnection::poll_read returns EOF.
control.recv_state = TcpSocketState::Closed;
wake_receiver = true;
}

if wake_receiver && control.recv_waker.is_some() {
if let Some(waker) = control.recv_waker.take() {
waker.wake();
// If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2,
// the local client have closed our receiver.
if matches!(control.recv_state, TcpSocketState::Normal)
&& !socket.may_recv()
&& !matches!(
socket.state(),
TcpState::Listen
| TcpState::SynReceived
| TcpState::Established
| TcpState::FinWait1
| TcpState::FinWait2
)
{
trace!("closed TCP Read Half, {:?}", socket.state());

// Let TcpConnection::poll_read returns EOF.
control.recv_state = TcpSocketState::Closed;
wake_receiver = true;
}
}

// Check if writable
let mut wake_sender = false;
while socket.can_send() && !control.send_buffer.is_empty() {
let result = socket.send(|buffer| {
let n = control.send_buffer.dequeue_slice(buffer);
(n, ())
});

match result {
Ok(..) => {
wake_sender = true;
if wake_receiver && control.recv_waker.is_some() {
if let Some(waker) = control.recv_waker.take() {
waker.wake();
}
Err(err) => {
error!("socket send error: {:?}, {:?}", err, socket.state());

// Don't know why. Abort the connection.
socket.abort();
}

if matches!(control.send_state, TcpSocketState::Normal) {
control.send_state = TcpSocketState::Closed;
// Check if writable
let mut wake_sender = false;
while socket.can_send() && !control.send_buffer.is_empty() {
let result = socket.send(|buffer| {
let n = control.send_buffer.dequeue_slice(buffer);
(n, ())
});

match result {
Ok(..) => {
wake_sender = true;
}
wake_sender = true;
Err(err) => {
error!("socket send error: {:?}, {:?}", err, socket.state());

// Don't know why. Abort the connection.
socket.abort();

if matches!(control.send_state, TcpSocketState::Normal) {
control.send_state = TcpSocketState::Closed;
}
wake_sender = true;

// The socket will be recycled in the next poll.
break;
// The socket will be recycled in the next poll.
break;
}
}
}
}

if wake_sender && control.send_waker.is_some() {
if let Some(waker) = control.send_waker.take() {
waker.wake();
if wake_sender && control.send_waker.is_some() {
if let Some(waker) = control.send_waker.take() {
waker.wake();
}
}
}
}

for socket_handle in sockets_to_remove {
sockets.remove(&socket_handle);
socket_set.remove(socket_handle);
}
for socket_handle in sockets_to_remove {
sockets.remove(&socket_handle);
socket_set.remove(socket_handle);
}

if !device.recv_available() {
let next_duration = iface
.poll_delay(before_poll, &socket_set)
.unwrap_or(SmolDuration::from_millis(5));
if next_duration != SmolDuration::ZERO {
thread::park_timeout(Duration::from(next_duration));
if !device.recv_available() {
let next_duration = iface
.poll_delay(before_poll, &socket_set)
.unwrap_or(SmolDuration::from_millis(5));
if next_duration != SmolDuration::ZERO {
thread::park_timeout(Duration::from(next_duration));
}
}
}
}

trace!("VirtDevice::poll thread exited");
})
trace!("VirtDevice::poll thread exited");
})
.unwrap()
};

let manager_notify = Arc::new(ManagerNotify::new(manager_handle.thread().clone()));
Expand Down

0 comments on commit cc619c4

Please sign in to comment.