Skip to content

Commit

Permalink
fix(client-common): attempt to handle shutdown more gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
colinmarc committed Aug 25, 2024
1 parent cce6dc1 commit 46f10b6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
3 changes: 3 additions & 0 deletions mm-client-common/src/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ impl AttachmentState {
// Mute the attachment_ended callback once.
self.reattach_required = msg.reattach_required;
}
protocol::MessageType::SessionEnded(_) => {
// We just check for the fin on the attachment stream.
}
protocol::MessageType::Error(error) => {
self.delegate.error(error.err_code(), error.error_text);
}
Expand Down
42 changes: 29 additions & 13 deletions mm-client-common/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub(crate) struct Conn {
conn: quiche::Connection,
partial_reads: HashMap<u64, bytes::BytesMut>,
open_streams: HashSet<u64>,

shutdown: oneshot::Receiver<()>,
shutting_down: bool,

incoming: crossbeam::Sender<ConnEvent>,
Expand All @@ -76,6 +78,7 @@ impl Conn {
incoming: crossbeam::Sender<ConnEvent>,
outgoing: crossbeam::Receiver<OutgoingMessage>,
ready: oneshot::Sender<Result<(), ConnError>>,
shutdown: oneshot::Receiver<()>,
connect_timeout: time::Duration,
) -> Result<Self, ConnError> {
let (hostname, server_addr) = resolve_server(addr)?;
Expand Down Expand Up @@ -133,6 +136,8 @@ impl Conn {
conn,
partial_reads: HashMap::new(),
open_streams: HashSet::new(),

shutdown,
shutting_down: false,

incoming,
Expand All @@ -152,10 +157,13 @@ impl Conn {
let start = time::Instant::now();

loop {
self.poll.poll(
&mut events,
self.conn.timeout().or(Some(time::Duration::from_secs(1))),
)?;
const ONE_SECOND: time::Duration = time::Duration::from_secs(1);
let timeout = self
.conn
.timeout()
.map_or(ONE_SECOND, |d| d.min(ONE_SECOND));

self.poll.poll(&mut events, Some(timeout))?;

let now = time::Instant::now();
if self.conn.timeout_instant().is_some_and(|t| now >= t) {
Expand Down Expand Up @@ -185,6 +193,10 @@ impl Conn {
}
}

if let Ok(Some(())) = self.shutdown.try_recv() {
self.start_shutdown()?;
}

// if (now - self.stats_timer) > time::Duration::from_millis(200) {
// self.stats_timer = now;
// let stats = self.conn.path_stats().next().unwrap();
Expand Down Expand Up @@ -237,8 +249,7 @@ impl Conn {
match self.incoming.send(ConnEvent::Datagram(msg)) {
Ok(()) => {}
Err(_) => {
self.conn.close(true, 0x00, b"")?;
self.shutting_down = true;
self.start_shutdown()?;
break;
}
}
Expand Down Expand Up @@ -272,9 +283,7 @@ impl Conn {
break;
}
Err(crossbeam::TryRecvError::Disconnected) => {
trace!("shutting down");
self.conn.close(true, 0x00, b"")?;
self.shutting_down = true;
self.start_shutdown()?;
break;
}
}
Expand All @@ -296,8 +305,7 @@ impl Conn {
match self.incoming.send(ConnEvent::StreamClosed(sid)) {
Ok(()) => {}
Err(_) => {
self.conn.close(true, 0x00, b"")?;
self.shutting_down = true;
self.start_shutdown()?;
break;
}
}
Expand Down Expand Up @@ -378,8 +386,7 @@ impl Conn {
match self.incoming.send(ConnEvent::StreamMessage(sid, msg)) {
Ok(()) => {}
Err(_) => {
self.conn.close(true, 0x00, b"")?;
self.shutting_down = true;
self.start_shutdown()?;
break;
}
}
Expand All @@ -402,6 +409,15 @@ impl Conn {

Ok(())
}

fn start_shutdown(&mut self) -> Result<(), ConnError> {
match self.conn.close(true, 0x00, b"") {
Ok(()) | Err(quiche::Error::Done) => (),
Err(e) => return Err(e.into()),
}
self.shutting_down = true;
Ok(())
}
}

fn gen_scid() -> quiche::ConnectionId<'static> {
Expand Down
22 changes: 18 additions & 4 deletions mm-client-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ConnHandle {
thread_handle: std::thread::JoinHandle<Result<(), conn::ConnError>>,
waker: Arc<mio::Waker>,
outgoing_tx: crossbeam::Sender<conn::OutgoingMessage>,
shutdown_tx: oneshot::Sender<()>,
}

/// The result of a simple request/response interaction with the server.
Expand Down Expand Up @@ -142,15 +143,18 @@ impl InnerClient {
let Some(ConnHandle {
thread_handle,
waker,
outgoing_tx,
shutdown_tx,
..
}) = self.conn_handle.take()
else {
return Err(ClientError::Defunct);
};

drop(outgoing_tx);

let _ = shutdown_tx.send(());
waker.wake().map_err(conn::ConnError::from)?;
if !thread_handle.is_finished() {
return Ok(());
}

match thread_handle.join() {
Ok(Ok(())) => Ok(()),
Expand All @@ -176,8 +180,17 @@ impl Client {
let (incoming_tx, incoming_rx) = crossbeam::unbounded();
let (outgoing_tx, outgoing_rx) = crossbeam::unbounded();
let (ready_tx, ready_rx) = oneshot::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let mut conn = conn::Conn::new(
addr,
incoming_tx,
outgoing_rx,
ready_tx,
shutdown_rx,
connect_timeout,
)?;

let mut conn = conn::Conn::new(addr, incoming_tx, outgoing_rx, ready_tx, connect_timeout)?;
let waker = conn.waker();

// Spawn a polling loop for the quic connection.
Expand All @@ -193,6 +206,7 @@ impl Client {
thread_handle,
waker,
outgoing_tx,
shutdown_tx,
}),
futures: HashMap::new(),
attachments: HashMap::new(),
Expand Down

0 comments on commit 46f10b6

Please sign in to comment.