Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow async implementation for BackupReader and BackupWriter #399

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.33"
aleph-bft = "^0.34"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.33.2"
version = "0.34.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
23 changes: 14 additions & 9 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};
use std::{
collections::HashSet,
fmt::{self, Debug},
marker::PhantomData,
pin::Pin,
};

use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use futures::{channel::oneshot, AsyncRead, AsyncReadExt};
use log::{error, info, warn};

use crate::{
Expand Down Expand Up @@ -63,26 +68,26 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: Read> {
backup: R,
pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: AsyncRead> {
backup: Pin<Box<R>>,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup,
backup: Box::pin(backup),
index,
session_id,
_phantom: PhantomData,
}
}

fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf)?;
self.backup.read_to_end(&mut buf).await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down Expand Up @@ -163,7 +168,7 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
starting_round: oneshot::Sender<Option<Round>>,
next_round_collection: oneshot::Receiver<Round>,
) {
let units = match self.load() {
let units = match self.load().await {
Ok(items) => items,
Err(e) => {
error!(target: LOG_TARGET, "unable to load backup data: {}", e);
Expand Down
24 changes: 13 additions & 11 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::io::Write;
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use codec::Encode;
use futures::{FutureExt, StreamExt};
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};

const LOG_TARGET: &str = "AlephBFT-backup-saver";

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: Write> {
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
backup: W,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
Expand All @@ -25,14 +25,16 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup,
backup: Box::pin(backup),
}
}

pub fn save_item(&mut self, item: &UncheckedSignedUnit<H, D, S>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
self.backup.flush().await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand All @@ -47,7 +49,7 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item) {
if let Err(e) = self.save_item(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down
17 changes: 11 additions & 6 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
Expand All @@ -23,7 +23,6 @@ use std::{
collections::HashSet,
convert::TryInto,
fmt::{self, Debug},
io::{Read, Write},
marker::PhantomData,
time::Duration,
};
Expand Down Expand Up @@ -108,15 +107,21 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read> {
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite,
UL: AsyncRead,
> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read>
impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
LocalIO<D, DP, FH, US, UL>
{
pub fn new(
Expand Down Expand Up @@ -573,8 +578,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
24 changes: 9 additions & 15 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,15 @@ use crate::{
Terminator, UncheckedSigned,
};
use aleph_bft_types::Recipient;
use futures::AsyncWrite;
use futures::{
channel::{mpsc, oneshot},
pin_mut, Future, FutureExt, StreamExt,
pin_mut, AsyncRead, Future, FutureExt, StreamExt,
};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use std::{
collections::HashSet,
convert::TryFrom,
fmt,
io::{Read, Write},
marker::PhantomData,
time::Duration,
};
use std::{collections::HashSet, convert::TryFrom, fmt, marker::PhantomData, time::Duration};

mod collection;
mod packer;
Expand Down Expand Up @@ -871,8 +865,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -887,8 +881,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -919,8 +913,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down
3 changes: 2 additions & 1 deletion examples/ordering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ futures = "0.3"
log = "0.4"
parking_lot = "0.12"
time = { version = "0.3", features = ["formatting", "macros", "local-offset"] }
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time"] }
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time", "fs"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
24 changes: 16 additions & 8 deletions examples/ordering/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::io::Write;
mod dataio;
mod network;

use aleph_bft::{run_session, NodeIndex, Terminator};
use aleph_bft_mock::{Keychain, Spawner};
use clap::Parser;
use dataio::{Data, DataProvider, FinalizationHandler};
use futures::{channel::oneshot, StreamExt};
use futures::{channel::oneshot, io, StreamExt};
use log::{debug, error, info};
use network::Network;
use std::{collections::HashMap, fs, fs::File, io, io::Write, path::Path, time::Duration};
use std::{collections::HashMap, path::Path, time::Duration};
use time::{macros::format_description, OffsetDateTime};
use tokio::fs::{self, File};
use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};

/// Example node producing linear order.
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -40,20 +43,23 @@ struct Args {
crash: bool,
}

fn create_backup(node_id: NodeIndex) -> Result<(File, io::Cursor<Vec<u8>>), io::Error> {
async fn create_backup(
node_id: NodeIndex,
) -> Result<(Compat<File>, io::Cursor<Vec<u8>>), io::Error> {
let stash_path = Path::new("./aleph-bft-examples-ordering-backup");
fs::create_dir_all(stash_path)?;
fs::create_dir_all(stash_path).await?;
let file_path = stash_path.join(format!("{}.units", node_id.0));
let loader = if file_path.exists() {
io::Cursor::new(fs::read(&file_path)?)
io::Cursor::new(fs::read(&file_path).await?)
} else {
io::Cursor::new(Vec::new())
};
let saver = fs::OpenOptions::new()
.create(true)
.append(true)
.open(file_path)?;
Ok((saver, loader))
.open(file_path)
.await?;
Ok((saver.compat_write(), loader))
}

fn finalized_counts(cf: &HashMap<NodeIndex, u32>) -> Vec<u32> {
Expand Down Expand Up @@ -104,7 +110,9 @@ async fn main() {
let n_members = ports.len().into();
let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled);
let (finalization_handler, mut finalized_rx) = FinalizationHandler::new();
let (backup_saver, backup_loader) = create_backup(id).expect("Error setting up unit saving");
let (backup_saver, backup_loader) = create_backup(id)
.await
.expect("Error setting up unit saving");
let local_io = aleph_bft::LocalIO::new(
data_provider,
finalization_handler,
Expand Down
2 changes: 1 addition & 1 deletion mock/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft-mock"
version = "0.11.1"
version = "0.12.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
documentation = "https://docs.rs/?"
Expand Down
Loading
Loading