Skip to content

Commit

Permalink
Restructure sync_with_peer to allow for dependencies between steps
Browse files Browse the repository at this point in the history
  • Loading branch information
AljoschaMeyer committed Nov 24, 2024
1 parent 4618893 commit 6b4a89d
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wgps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ willow-encoding = { path = "../encoding", version = "0.1.0" }
ufotofu = { version = "0.4.2", features = ["std"] }
futures = { version = "0.3.31" }
either = "1.10.0"
async_cell = "0.2.2"

[dev-dependencies]
smol = "2.0.0"
Expand Down
6 changes: 3 additions & 3 deletions wgps/src/commitment_scheme/execute_prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ pub(crate) async fn execute_prelude<
P: BulkProducer<Item = u8, Error = E>,
>(
max_payload_power: u8,
our_nonce: [u8; CHALLENGE_LENGTH],
our_nonce: &[u8; CHALLENGE_LENGTH],
consumer: &mut C,
producer: &mut P,
) -> Result<ReceivedPrelude<CHALLENGE_HASH_LENGTH>, ExecutePreludeError<E>> {
let commitment = CH::hash(our_nonce);

let receive_fut = Box::pin(receive_prelude::<CHALLENGE_HASH_LENGTH, _>(producer));
let send_fut = Box::pin(send_prelude(max_payload_power, commitment, consumer));
let send_fut = Box::pin(send_prelude(max_payload_power, &commitment, consumer));

let (received_prelude, ()) = match try_select(receive_fut, send_fut).await {
Ok(Either::Left((received, send_fut))) => (received, send_fut.await?),
Expand All @@ -73,7 +73,7 @@ pub(crate) async fn execute_prelude<
Err(Either::Right((error, _))) => return Err(error.into()),
};

let msg = CommitmentReveal { nonce: our_nonce };
let msg = CommitmentReveal { nonce: &our_nonce };
msg.encode(consumer).await?;

Ok(received_prelude)
Expand Down
4 changes: 2 additions & 2 deletions wgps/src/commitment_scheme/send_prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use ufotofu::{local_nb::BulkConsumer, nb::ConsumeFullSliceError};
/// Sends the prelude of maximum payload power and what is to be the recipient's `received_commitment` via a transport.
pub(crate) async fn send_prelude<const CHALLENGE_HASH_LENGTH: usize, C: BulkConsumer<Item = u8>>(
max_payload_power: u8,
commitment: [u8; CHALLENGE_HASH_LENGTH],
commitment: &[u8; CHALLENGE_HASH_LENGTH],
transport: &mut C,
) -> Result<(), C::Error> {
transport.consume(max_payload_power).await?;

if let Err(ConsumeFullSliceError { reason, .. }) =
transport.bulk_consume_full_slice(&commitment).await
transport.bulk_consume_full_slice(commitment).await
{
return Err(reason);
}
Expand Down
91 changes: 73 additions & 18 deletions wgps/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::future::Future;

use execute_prelude::ExecutePreludeError;
use async_cell::unsync::AsyncCell;
use futures::try_join;

use receive_prelude::{ReceivePreludeError, ReceivedPrelude};
use ufotofu::local_nb::{BulkConsumer, BulkProducer, Producer};
use willow_data_model::{
grouping::{AreaOfInterest, Range3d},
Expand All @@ -15,24 +18,47 @@ mod messages;
pub use messages::*;

mod commitment_scheme;
use commitment_scheme::execute_prelude::execute_prelude;
pub use commitment_scheme::*;
use commitment_scheme::{receive_prelude::receive_prelude, send_prelude::send_prelude};

/// An error which can occur during a WGPS synchronisation session.
pub enum WgpsError<E> {
Prelude(ExecutePreludeError<E>),
/// The received max payload power was invalid, i.e. greater than 64.
PreludeMaxPayloadInvalid,
/// The transport stopped producing bytes before it could be deemed ready.
PreludeFinishedTooSoon,
/// The underlying transport emitted an error.
Transport(E),
}

impl<E> From<ExecutePreludeError<E>> for WgpsError<E> {
fn from(value: ExecutePreludeError<E>) -> Self {
Self::Prelude(value)
impl<E> From<E> for WgpsError<E> {
fn from(value: E) -> Self {
Self::Transport(value)
}
}

impl<E> From<ReceivePreludeError<E>> for WgpsError<E> {
fn from(value: ReceivePreludeError<E>) -> Self {
match value {
ReceivePreludeError::Transport(err) => err.into(),
ReceivePreludeError::MaxPayloadInvalid => WgpsError::PreludeMaxPayloadInvalid,
ReceivePreludeError::FinishedTooSoon => WgpsError::PreludeFinishedTooSoon,
}
}
}

impl<E: core::fmt::Display> core::fmt::Display for WgpsError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WgpsError::Prelude(execute_prelude_error) => write!(f, "{}", execute_prelude_error),
WgpsError::Transport(transport_error) => write!(f, "{}", transport_error),
WgpsError::PreludeMaxPayloadInvalid => write!(
f,
"The peer sent an invalid max payload power in their prelude."
),
WgpsError::PreludeFinishedTooSoon => write!(
f,
"The peer terminated the connection before sending their full prelude."
),
}
}
}
Expand All @@ -51,19 +77,48 @@ pub async fn sync_with_peer<
P: BulkProducer<Item = u8, Error = E>,
>(
options: &SyncOptions<CHALLENGE_LENGTH>,
mut consumer: C,
consumer: C,
mut producer: P,
) -> Result<(), WgpsError<E>> {
execute_prelude::<CHALLENGE_LENGTH, CHALLENGE_HASH_LENGTH, CH, _, _, _>(
options.max_payload_power,
options.challenge_nonce,
&mut consumer,
&mut producer,
)
.await?;

// TODO: The rest of the WGPS. No probs!

// Compute the commitment to our [nonce](https://willowprotocol.org/specs/sync/index.html#nonce); we send this
// commitment to the peer at the start (the *prelude*) of the sync session.
let our_commitment = CH::hash(&options.challenge_nonce);

let mut consumer = consumer; // TODO turn into a SharedConsumer in an Arc here. See util::shared_encoder on the logical_channels branch (and ask Aljoscha about it).

// This is set to `()` once our own prelude has been sent.
let sent_own_prelude = AsyncCell::<()>::new();
// This is set to the prelude received from the peer once it has arrived.
let received_prelude = AsyncCell::<ReceivedPrelude<CHALLENGE_HASH_LENGTH>>::new();

// Every unit of work that the WGPS needs to perform is defined as a future in the following, via an async block.
// If one of these futures needs another unit of work to have been completed, this should be enforced by
// calling `some_cell.get().await` for one of the cells defined above. Generally, these futures should *not* call
// `some_cell.take().await`, since that might mess with other futures depending on the same step to have completed.
//
// Each of the futures must evaluate to a `Result<(), WgpsError<E>>`.
// Since type annotations for async blocks are not possible in today's rust, we instead provide
// a type annotation on the return value; that's why the last two lines of each of the following
// async blocks are a weirdly overcomplicated way of returning `Ok(())`.

let do_send_prelude = async {
send_prelude(options.max_payload_power, &our_commitment, &mut consumer).await?;
sent_own_prelude.set(());

let ret: Result<(), WgpsError<E>> = Ok(());
ret
};

let do_receive_prelude = async {
received_prelude.set(receive_prelude(&mut producer).await?);

let ret: Result<(), WgpsError<E>> = Ok(());
ret
};

// Add each of the futures here. The macro polls them all to completion, until the first one hits
// an error, in which case this function immediately returns with that first error.
let ((), ()) = try_join!(do_send_prelude, do_receive_prelude,)?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions wgps/src/messages/commitment_reveal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use ufotofu::local_nb::BulkConsumer;
use willow_encoding::Encodable;

pub struct CommitmentReveal<const CHALLENGE_LENGTH: usize> {
pub nonce: [u8; CHALLENGE_LENGTH],
pub struct CommitmentReveal<'nonce, const CHALLENGE_LENGTH: usize> {
pub nonce: &'nonce [u8; CHALLENGE_LENGTH],
}

impl<const CHALLENGE_LENGTH: usize> Encodable for CommitmentReveal<CHALLENGE_LENGTH> {
impl<'nonce, const CHALLENGE_LENGTH: usize> Encodable for CommitmentReveal<'nonce, CHALLENGE_LENGTH> {
async fn encode<Consumer>(&self, consumer: &mut Consumer) -> Result<(), Consumer::Error>
where
Consumer: BulkConsumer<Item = u8>,
Expand Down
2 changes: 1 addition & 1 deletion wgps/src/parameters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub trait ChallengeHash<const CHALLENGE_LENGTH: usize, const CHALLENGE_HASH_LENGTH: usize> {
fn hash(nonce: [u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH];
fn hash(nonce: &[u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH];
}

0 comments on commit 6b4a89d

Please sign in to comment.