From 671b687f94d8ca92740dbc319f686374ac3ba6cf Mon Sep 17 00:00:00 2001 From: Miguel Date: Mon, 16 Oct 2023 19:46:31 -0400 Subject: [PATCH] chore(cdk-rust): simplify and clean up --- lib/cdk-rust/src/builder.rs | 19 +++---- lib/cdk-rust/src/connection.rs | 60 +++++++---------------- lib/cdk-rust/src/context.rs | 13 +---- lib/cdk-rust/src/{driver.rs => handle.rs} | 17 +------ lib/cdk-rust/src/lib.rs | 2 +- 5 files changed, 29 insertions(+), 82 deletions(-) rename lib/cdk-rust/src/{driver.rs => handle.rs} (81%) diff --git a/lib/cdk-rust/src/builder.rs b/lib/cdk-rust/src/builder.rs index 39abdc46c..eeaf6355a 100644 --- a/lib/cdk-rust/src/builder.rs +++ b/lib/cdk-rust/src/builder.rs @@ -3,11 +3,11 @@ use tokio::sync::mpsc; use crate::connection; use crate::context::Context; -use crate::driver::{Driver, Handle}; +use crate::handle::Handle; use crate::mode::{Mode, ModeSetting, PrimaryMode, SecondaryMode}; use crate::transport::Transport; -/// Builds a client or driver. +/// Builds a client. pub struct Builder { mode: M, transport: Option, @@ -52,7 +52,7 @@ impl Builder { } impl Builder> { - pub fn drive(self, driver: D) -> Handle { + pub fn handle(self) -> Handle { // This unwrap is safe because `transport()` is required (using the type system). let transport = self.transport.unwrap().0; let (tx, rx) = mpsc::channel(1024); @@ -61,9 +61,7 @@ impl Builder> { // Todo: better default? self.pk.unwrap_or(ClientPublicKey([1; 96])), ); - tokio::spawn(connection::connect_and_drive::( - transport, driver, rx, context, - )); + tokio::spawn(connection::connect_and_drive::(transport, rx, context)); Handle::new(tx) } } @@ -83,8 +81,9 @@ impl Builder { } impl Builder> { - pub fn drive(self, driver: D) -> Handle { - // This unwrap is safe because `transport()` is required (using the type system). + pub fn handle(self) -> Handle { + // This unwrap is safe because `transport()` because this method is only available + // after attaching a transport. let transport = self.transport.unwrap().0; let (tx, rx) = mpsc::channel(1024); let context = Context::new( @@ -92,9 +91,7 @@ impl Builder> { // Todo: better default? self.pk.unwrap_or(ClientPublicKey([1; 96])), ); - tokio::spawn(connection::connect_and_drive::( - transport, driver, rx, context, - )); + tokio::spawn(connection::connect_and_drive::(transport, rx, context)); Handle::new(tx) } } diff --git a/lib/cdk-rust/src/connection.rs b/lib/cdk-rust/src/connection.rs index 2cc32b03d..d962d0560 100644 --- a/lib/cdk-rust/src/connection.rs +++ b/lib/cdk-rust/src/connection.rs @@ -4,52 +4,35 @@ use futures::{SinkExt, StreamExt}; use tokio::sync::mpsc::Receiver; use crate::context::Context; -use crate::driver::{Driver, Event, RequestResponse}; +use crate::handle::RequestResponse; use crate::mode::{ModeSetting, PrimaryMode, SecondaryMode}; use crate::schema::{HandshakeRequestFrame, RequestFrame, ResponseFrame}; use crate::transport::Transport; -pub async fn connect_and_drive( +pub async fn connect_and_drive( transport: T, - mut driver: D, request_rx: Receiver, - mut ctx: Context, + ctx: Context, ) -> anyhow::Result<()> { let (mut tx, mut rx) = transport.connect().await?; - let success = handshake::((&mut tx, &mut rx), &ctx).await?; - - driver.drive(Event::Connection { success }, &mut ctx); - - if success { - bail!("handshake failed"); + match ctx.mode() { + ModeSetting::Primary(setting) => { + start_handshake::((&mut tx, &mut rx), setting, *ctx.pk()).await? + }, + ModeSetting::Secondary(setting) => { + join_connection::((&mut tx, &mut rx), setting).await? + }, } - let _ = connection_loop::((tx, rx), request_rx, &mut ctx, &mut driver).await; - - // Todo: Add termination reason if applicable. - driver.drive(Event::Disconnect { reason: None }, &mut ctx); - - Ok(()) -} - -async fn handshake( - (tx, rx): (&mut T::Sender, &mut T::Receiver), - ctx: &Context, -) -> anyhow::Result { - let success = match ctx.mode() { - ModeSetting::Primary(setting) => start_handshake::((tx, rx), setting, *ctx.pk()).await?, - ModeSetting::Secondary(setting) => join_connection::((tx, rx), setting).await?, - }; - - Ok(success) + connection_loop::((tx, rx), request_rx).await } async fn start_handshake( (tx, _): (&mut T::Sender, &mut T::Receiver), setting: &PrimaryMode, pk: ClientPublicKey, -) -> anyhow::Result { +) -> anyhow::Result<()> { tx.send( HandshakeRequestFrame::Handshake { retry: None, @@ -64,13 +47,13 @@ async fn start_handshake( // Todo: Complete HANDSHAKE. - Ok(true) + Ok(()) } async fn join_connection( (tx, _): (&mut T::Sender, &mut T::Receiver), setting: &SecondaryMode, -) -> anyhow::Result { +) -> anyhow::Result<()> { tx.send( HandshakeRequestFrame::JoinRequest { access_token: setting.access_token, @@ -81,17 +64,15 @@ async fn join_connection( // Todo: Complete JOIN. - Ok(true) + Ok(()) } -async fn connection_loop( +async fn connection_loop( (mut tx, mut rx): (T::Sender, T::Receiver), mut request_rx: Receiver, - ctx: &mut Context, - driver: &mut D, ) -> anyhow::Result<()> { while let Some(request) = request_rx.recv().await { - // Todo: If (tx, rx) was a individual (QUIC) stream, + // Todo: If (tx, rx) was an individual (QUIC) stream, // we could move this pair in a separate task and // avoid waiting. tx.send(RequestFrame::from(request.payload).encode()) @@ -101,13 +82,6 @@ async fn connection_loop( bail!("connection closed unexpectedly"); }; - driver.drive( - Event::PayloadReceived { - bytes: bytes.as_ref(), - }, - ctx, - ); - tokio::spawn(async move { match ResponseFrame::decode(bytes.as_ref()) { Ok(response) => { diff --git a/lib/cdk-rust/src/context.rs b/lib/cdk-rust/src/context.rs index 7440eb374..29ccd79a1 100644 --- a/lib/cdk-rust/src/context.rs +++ b/lib/cdk-rust/src/context.rs @@ -4,17 +4,13 @@ use crate::mode::ModeSetting; pub struct Context { mode: ModeSetting, + // The provider's public key. pk: ClientPublicKey, - shutdown: bool, } impl Context { pub fn new(mode: ModeSetting, pk: ClientPublicKey) -> Self { - Self { - mode, - pk, - shutdown: false, - } + Self { mode, pk } } /// Returns the mode setting of the connection. @@ -25,9 +21,4 @@ impl Context { pub fn pk(&self) -> &ClientPublicKey { &self.pk } - - // Shut downs the connection. - pub fn shutdown(&mut self) { - self.shutdown = true; - } } diff --git a/lib/cdk-rust/src/driver.rs b/lib/cdk-rust/src/handle.rs similarity index 81% rename from lib/cdk-rust/src/driver.rs rename to lib/cdk-rust/src/handle.rs index 7cc0325f1..129557c9f 100644 --- a/lib/cdk-rust/src/driver.rs +++ b/lib/cdk-rust/src/handle.rs @@ -5,15 +5,10 @@ use bytes::Bytes; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; -use crate::context::Context; -use crate::schema::{RequestFrame, ResponseFrame, TerminationReason}; +use crate::schema::{RequestFrame, ResponseFrame}; pub type Response = ResponseFrame; -pub trait Driver: Send + 'static { - fn drive(&mut self, _: Event<'_>, _: &mut Context) {} -} - pub enum Request { /// Raw message to be sent to the service implementation. ServicePayload { bytes: Bytes }, @@ -23,12 +18,6 @@ pub enum Request { ExtendAccessToken { ttl: u64 }, } -pub enum Event<'a> { - Connection { success: bool }, - PayloadReceived { bytes: &'a [u8] }, - Disconnect { reason: Option }, -} - pub struct RequestResponse { pub payload: Request, pub respond: oneshot::Sender, @@ -63,10 +52,6 @@ impl Handle { } } } - - pub fn disconnect(self) { - // Dropping the senders will cause the connection to disconnect. - } } // RequestFrame is internal and includes frames diff --git a/lib/cdk-rust/src/lib.rs b/lib/cdk-rust/src/lib.rs index 2c3765237..4298aee23 100644 --- a/lib/cdk-rust/src/lib.rs +++ b/lib/cdk-rust/src/lib.rs @@ -1,7 +1,7 @@ mod builder; mod connection; mod context; -mod driver; +mod handle; mod mode; mod schema; mod tls;