Skip to content

Commit

Permalink
chore(cdk-rust): simplify and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
kckeiks committed Oct 16, 2023
1 parent 00f97b8 commit 671b687
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 82 deletions.
19 changes: 8 additions & 11 deletions lib/cdk-rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use tokio::sync::mpsc;

use crate::connection;
use crate::context::Context;
use crate::driver::{Driver, Handle};
use crate::handle::Handle;
use crate::mode::{Mode, ModeSetting, PrimaryMode, SecondaryMode};
use crate::transport::Transport;

/// Builds a client or driver.
/// Builds a client.
pub struct Builder<M, T> {
mode: M,
transport: Option<T>,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl<T: Transport> Builder<PrimaryMode, T> {
}

impl<T: Transport> Builder<PrimaryMode, AttachedTransport<T>> {
pub fn drive<D: Driver>(self, driver: D) -> Handle {
pub fn handle(self) -> Handle {
// This unwrap is safe because `transport()` is required (using the type system).
let transport = self.transport.unwrap().0;
let (tx, rx) = mpsc::channel(1024);
Expand All @@ -61,9 +61,7 @@ impl<T: Transport> Builder<PrimaryMode, AttachedTransport<T>> {
// Todo: better default?
self.pk.unwrap_or(ClientPublicKey([1; 96])),
);
tokio::spawn(connection::connect_and_drive::<T, D>(
transport, driver, rx, context,
));
tokio::spawn(connection::connect_and_drive::<T>(transport, rx, context));
Handle::new(tx)
}
}
Expand All @@ -83,18 +81,17 @@ impl<T: Transport> Builder<SecondaryMode, T> {
}

impl<T: Transport> Builder<SecondaryMode, AttachedTransport<T>> {
pub fn drive<D: Driver>(self, driver: D) -> Handle {
// This unwrap is safe because `transport()` is required (using the type system).
pub fn handle(self) -> Handle {
// This unwrap is safe because `transport()` because this method is only available
// after attaching a transport.
let transport = self.transport.unwrap().0;
let (tx, rx) = mpsc::channel(1024);
let context = Context::new(
ModeSetting::Secondary(self.mode),
// Todo: better default?
self.pk.unwrap_or(ClientPublicKey([1; 96])),
);
tokio::spawn(connection::connect_and_drive::<T, D>(
transport, driver, rx, context,
));
tokio::spawn(connection::connect_and_drive::<T>(transport, rx, context));
Handle::new(tx)
}
}
Expand Down
60 changes: 17 additions & 43 deletions lib/cdk-rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,35 @@ use futures::{SinkExt, StreamExt};
use tokio::sync::mpsc::Receiver;

use crate::context::Context;
use crate::driver::{Driver, Event, RequestResponse};
use crate::handle::RequestResponse;
use crate::mode::{ModeSetting, PrimaryMode, SecondaryMode};
use crate::schema::{HandshakeRequestFrame, RequestFrame, ResponseFrame};
use crate::transport::Transport;

pub async fn connect_and_drive<T: Transport, D: Driver>(
pub async fn connect_and_drive<T: Transport>(
transport: T,
mut driver: D,
request_rx: Receiver<RequestResponse>,
mut ctx: Context,
ctx: Context,
) -> anyhow::Result<()> {
let (mut tx, mut rx) = transport.connect().await?;

let success = handshake::<T>((&mut tx, &mut rx), &ctx).await?;

driver.drive(Event::Connection { success }, &mut ctx);

if success {
bail!("handshake failed");
match ctx.mode() {
ModeSetting::Primary(setting) => {
start_handshake::<T>((&mut tx, &mut rx), setting, *ctx.pk()).await?
},
ModeSetting::Secondary(setting) => {
join_connection::<T>((&mut tx, &mut rx), setting).await?
},
}

let _ = connection_loop::<T, D>((tx, rx), request_rx, &mut ctx, &mut driver).await;

// Todo: Add termination reason if applicable.
driver.drive(Event::Disconnect { reason: None }, &mut ctx);

Ok(())
}

async fn handshake<T: Transport>(
(tx, rx): (&mut T::Sender, &mut T::Receiver),
ctx: &Context,
) -> anyhow::Result<bool> {
let success = match ctx.mode() {
ModeSetting::Primary(setting) => start_handshake::<T>((tx, rx), setting, *ctx.pk()).await?,
ModeSetting::Secondary(setting) => join_connection::<T>((tx, rx), setting).await?,
};

Ok(success)
connection_loop::<T>((tx, rx), request_rx).await
}

async fn start_handshake<T: Transport>(
(tx, _): (&mut T::Sender, &mut T::Receiver),
setting: &PrimaryMode,
pk: ClientPublicKey,
) -> anyhow::Result<bool> {
) -> anyhow::Result<()> {
tx.send(
HandshakeRequestFrame::Handshake {
retry: None,
Expand All @@ -64,13 +47,13 @@ async fn start_handshake<T: Transport>(

// Todo: Complete HANDSHAKE.

Ok(true)
Ok(())
}

async fn join_connection<T: Transport>(
(tx, _): (&mut T::Sender, &mut T::Receiver),
setting: &SecondaryMode,
) -> anyhow::Result<bool> {
) -> anyhow::Result<()> {
tx.send(
HandshakeRequestFrame::JoinRequest {
access_token: setting.access_token,
Expand All @@ -81,17 +64,15 @@ async fn join_connection<T: Transport>(

// Todo: Complete JOIN.

Ok(true)
Ok(())
}

async fn connection_loop<T: Transport, D: Driver>(
async fn connection_loop<T: Transport>(
(mut tx, mut rx): (T::Sender, T::Receiver),
mut request_rx: Receiver<RequestResponse>,
ctx: &mut Context,
driver: &mut D,
) -> anyhow::Result<()> {
while let Some(request) = request_rx.recv().await {
// Todo: If (tx, rx) was a individual (QUIC) stream,
// Todo: If (tx, rx) was an individual (QUIC) stream,
// we could move this pair in a separate task and
// avoid waiting.
tx.send(RequestFrame::from(request.payload).encode())
Expand All @@ -101,13 +82,6 @@ async fn connection_loop<T: Transport, D: Driver>(
bail!("connection closed unexpectedly");
};

driver.drive(
Event::PayloadReceived {
bytes: bytes.as_ref(),
},
ctx,
);

tokio::spawn(async move {
match ResponseFrame::decode(bytes.as_ref()) {
Ok(response) => {
Expand Down
13 changes: 2 additions & 11 deletions lib/cdk-rust/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ use crate::mode::ModeSetting;

pub struct Context {
mode: ModeSetting,
// The provider's public key.
pk: ClientPublicKey,
shutdown: bool,
}

impl Context {
pub fn new(mode: ModeSetting, pk: ClientPublicKey) -> Self {
Self {
mode,
pk,
shutdown: false,
}
Self { mode, pk }
}

/// Returns the mode setting of the connection.
Expand All @@ -25,9 +21,4 @@ impl Context {
pub fn pk(&self) -> &ClientPublicKey {
&self.pk
}

// Shut downs the connection.
pub fn shutdown(&mut self) {
self.shutdown = true;
}
}
17 changes: 1 addition & 16 deletions lib/cdk-rust/src/driver.rs → lib/cdk-rust/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ use bytes::Bytes;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;

use crate::context::Context;
use crate::schema::{RequestFrame, ResponseFrame, TerminationReason};
use crate::schema::{RequestFrame, ResponseFrame};

pub type Response = ResponseFrame;

pub trait Driver: Send + 'static {
fn drive(&mut self, _: Event<'_>, _: &mut Context) {}
}

pub enum Request {
/// Raw message to be sent to the service implementation.
ServicePayload { bytes: Bytes },
Expand All @@ -23,12 +18,6 @@ pub enum Request {
ExtendAccessToken { ttl: u64 },
}

pub enum Event<'a> {
Connection { success: bool },
PayloadReceived { bytes: &'a [u8] },
Disconnect { reason: Option<TerminationReason> },
}

pub struct RequestResponse {
pub payload: Request,
pub respond: oneshot::Sender<Response>,
Expand Down Expand Up @@ -63,10 +52,6 @@ impl Handle {
}
}
}

pub fn disconnect(self) {
// Dropping the senders will cause the connection to disconnect.
}
}

// RequestFrame is internal and includes frames
Expand Down
2 changes: 1 addition & 1 deletion lib/cdk-rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod builder;
mod connection;
mod context;
mod driver;
mod handle;
mod mode;
mod schema;
mod tls;
Expand Down

0 comments on commit 671b687

Please sign in to comment.