diff --git a/mm-client-common/src/attachment.rs b/mm-client-common/src/attachment.rs index b7fc719..5941977 100644 --- a/mm-client-common/src/attachment.rs +++ b/mm-client-common/src/attachment.rs @@ -463,6 +463,9 @@ impl AttachmentState { // Mute the attachment_ended callback once. self.reattach_required = msg.reattach_required; } + protocol::MessageType::SessionEnded(_) => { + // We just check for the fin on the attachment stream. + } protocol::MessageType::Error(error) => { self.delegate.error(error.err_code(), error.error_text); } diff --git a/mm-client-common/src/conn.rs b/mm-client-common/src/conn.rs index 48cfd62..482d81c 100644 --- a/mm-client-common/src/conn.rs +++ b/mm-client-common/src/conn.rs @@ -61,6 +61,8 @@ pub(crate) struct Conn { conn: quiche::Connection, partial_reads: HashMap, open_streams: HashSet, + + shutdown: oneshot::Receiver<()>, shutting_down: bool, incoming: crossbeam::Sender, @@ -76,6 +78,7 @@ impl Conn { incoming: crossbeam::Sender, outgoing: crossbeam::Receiver, ready: oneshot::Sender>, + shutdown: oneshot::Receiver<()>, connect_timeout: time::Duration, ) -> Result { let (hostname, server_addr) = resolve_server(addr)?; @@ -133,6 +136,8 @@ impl Conn { conn, partial_reads: HashMap::new(), open_streams: HashSet::new(), + + shutdown, shutting_down: false, incoming, @@ -152,10 +157,13 @@ impl Conn { let start = time::Instant::now(); loop { - self.poll.poll( - &mut events, - self.conn.timeout().or(Some(time::Duration::from_secs(1))), - )?; + const ONE_SECOND: time::Duration = time::Duration::from_secs(1); + let timeout = self + .conn + .timeout() + .map_or(ONE_SECOND, |d| d.min(ONE_SECOND)); + + self.poll.poll(&mut events, Some(timeout))?; let now = time::Instant::now(); if self.conn.timeout_instant().is_some_and(|t| now >= t) { @@ -185,6 +193,10 @@ impl Conn { } } + if let Ok(Some(())) = self.shutdown.try_recv() { + self.start_shutdown()?; + } + // if (now - self.stats_timer) > time::Duration::from_millis(200) { // self.stats_timer = now; // let stats = self.conn.path_stats().next().unwrap(); @@ -237,8 +249,7 @@ impl Conn { match self.incoming.send(ConnEvent::Datagram(msg)) { Ok(()) => {} Err(_) => { - self.conn.close(true, 0x00, b"")?; - self.shutting_down = true; + self.start_shutdown()?; break; } } @@ -272,9 +283,7 @@ impl Conn { break; } Err(crossbeam::TryRecvError::Disconnected) => { - trace!("shutting down"); - self.conn.close(true, 0x00, b"")?; - self.shutting_down = true; + self.start_shutdown()?; break; } } @@ -296,8 +305,7 @@ impl Conn { match self.incoming.send(ConnEvent::StreamClosed(sid)) { Ok(()) => {} Err(_) => { - self.conn.close(true, 0x00, b"")?; - self.shutting_down = true; + self.start_shutdown()?; break; } } @@ -378,8 +386,7 @@ impl Conn { match self.incoming.send(ConnEvent::StreamMessage(sid, msg)) { Ok(()) => {} Err(_) => { - self.conn.close(true, 0x00, b"")?; - self.shutting_down = true; + self.start_shutdown()?; break; } } @@ -402,6 +409,15 @@ impl Conn { Ok(()) } + + fn start_shutdown(&mut self) -> Result<(), ConnError> { + match self.conn.close(true, 0x00, b"") { + Ok(()) | Err(quiche::Error::Done) => (), + Err(e) => return Err(e.into()), + } + self.shutting_down = true; + Ok(()) + } } fn gen_scid() -> quiche::ConnectionId<'static> { diff --git a/mm-client-common/src/lib.rs b/mm-client-common/src/lib.rs index 230e786..af86173 100644 --- a/mm-client-common/src/lib.rs +++ b/mm-client-common/src/lib.rs @@ -60,6 +60,7 @@ struct ConnHandle { thread_handle: std::thread::JoinHandle>, waker: Arc, outgoing_tx: crossbeam::Sender, + shutdown_tx: oneshot::Sender<()>, } /// The result of a simple request/response interaction with the server. @@ -142,15 +143,18 @@ impl InnerClient { let Some(ConnHandle { thread_handle, waker, - outgoing_tx, + shutdown_tx, + .. }) = self.conn_handle.take() else { return Err(ClientError::Defunct); }; - drop(outgoing_tx); - + let _ = shutdown_tx.send(()); waker.wake().map_err(conn::ConnError::from)?; + if !thread_handle.is_finished() { + return Ok(()); + } match thread_handle.join() { Ok(Ok(())) => Ok(()), @@ -176,8 +180,17 @@ impl Client { let (incoming_tx, incoming_rx) = crossbeam::unbounded(); let (outgoing_tx, outgoing_rx) = crossbeam::unbounded(); let (ready_tx, ready_rx) = oneshot::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let mut conn = conn::Conn::new( + addr, + incoming_tx, + outgoing_rx, + ready_tx, + shutdown_rx, + connect_timeout, + )?; - let mut conn = conn::Conn::new(addr, incoming_tx, outgoing_rx, ready_tx, connect_timeout)?; let waker = conn.waker(); // Spawn a polling loop for the quic connection. @@ -193,6 +206,7 @@ impl Client { thread_handle, waker, outgoing_tx, + shutdown_tx, }), futures: HashMap::new(), attachments: HashMap::new(),