diff --git a/Cargo.lock b/Cargo.lock index 97271385..220ee7f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.33.1" +version = "0.34.0" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", @@ -97,11 +97,12 @@ dependencies = [ "parking_lot", "time", "tokio", + "tokio-util", ] [[package]] name = "aleph-bft-mock" -version = "0.11.1" +version = "0.12.0" dependencies = [ "aleph-bft-types", "async-trait", @@ -115,7 +116,7 @@ dependencies = [ [[package]] name = "aleph-bft-rmc" -version = "0.11.0" +version = "0.11.1" dependencies = [ "aleph-bft-crypto", "aleph-bft-mock", @@ -1122,6 +1123,20 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.3" diff --git a/README.md b/README.md index 6ff99334..177e9bac 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details]. - Import AlephBFT in your crate ```toml [dependencies] - aleph-bft = "^0.33" + aleph-bft = "^0.34" ``` - The main entry point is the `run_session` function, which returns a Future that runs the consensus algorithm. diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 34a30857..3d19d37b 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.33.2" +version = "0.34.0" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index c7b960b6..7dc76942 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -1,7 +1,12 @@ -use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData}; +use std::{ + collections::HashSet, + fmt::{self, Debug}, + marker::PhantomData, + pin::Pin, +}; use codec::{Decode, Error as CodecError}; -use futures::channel::oneshot; +use futures::{channel::oneshot, AsyncRead, AsyncReadExt}; use log::{error, info, warn}; use crate::{ @@ -63,26 +68,26 @@ impl From for LoaderError { } } -pub struct BackupLoader { - backup: R, +pub struct BackupLoader { + backup: Pin>, index: NodeIndex, session_id: SessionId, _phantom: PhantomData<(H, D, S)>, } -impl BackupLoader { +impl BackupLoader { pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { BackupLoader { - backup, + backup: Box::pin(backup), index, session_id, _phantom: PhantomData, } } - fn load(&mut self) -> Result>, LoaderError> { + async fn load(&mut self) -> Result>, LoaderError> { let mut buf = Vec::new(); - self.backup.read_to_end(&mut buf)?; + self.backup.read_to_end(&mut buf).await?; let input = &mut &buf[..]; let mut result = Vec::new(); while !input.is_empty() { @@ -163,7 +168,7 @@ impl BackupLoader { starting_round: oneshot::Sender>, next_round_collection: oneshot::Receiver, ) { - let units = match self.load() { + let units = match self.load().await { Ok(items) => items, Err(e) => { error!(target: LOG_TARGET, "unable to load backup data: {}", e); diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index c4657f74..f13e25a6 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -1,8 +1,8 @@ -use std::io::Write; +use std::pin::Pin; use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator}; use codec::Encode; -use futures::{FutureExt, StreamExt}; +use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use log::{debug, error}; const LOG_TARGET: &str = "AlephBFT-backup-saver"; @@ -10,13 +10,13 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver"; /// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { +pub struct BackupSaver { units_from_runway: Receiver>, responses_for_runway: Sender>, - backup: W, + backup: Pin>, } -impl BackupSaver { +impl BackupSaver { pub fn new( units_from_runway: Receiver>, responses_for_runway: Sender>, @@ -25,14 +25,16 @@ impl BackupSaver { BackupSaver { units_from_runway, responses_for_runway, - backup, + backup: Box::pin(backup), } } - pub fn save_item(&mut self, item: &UncheckedSignedUnit) -> Result<(), std::io::Error> { - self.backup.write_all(&item.encode())?; - self.backup.flush()?; - Ok(()) + pub async fn save_item( + &mut self, + item: &UncheckedSignedUnit, + ) -> Result<(), std::io::Error> { + self.backup.write_all(&item.encode()).await?; + self.backup.flush().await } pub async fn run(&mut self, mut terminator: Terminator) { @@ -47,7 +49,7 @@ impl BackupSaver { break; }, }; - if let Err(e) = self.save_item(&item) { + if let Err(e) = self.save_item(&item).await { error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); break; } diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 196356a8..defba2ba 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -13,7 +13,7 @@ use crate::{ }; use aleph_bft_types::NodeMap; use codec::{Decode, Encode}; -use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; +use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt}; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; @@ -23,7 +23,6 @@ use std::{ collections::HashSet, convert::TryInto, fmt::{self, Debug}, - io::{Read, Write}, marker::PhantomData, time::Duration, }; @@ -108,7 +107,13 @@ enum TaskDetails { } #[derive(Clone)] -pub struct LocalIO, FH: FinalizationHandler, US: Write, UL: Read> { +pub struct LocalIO< + D: Data, + DP: DataProvider, + FH: FinalizationHandler, + US: AsyncWrite, + UL: AsyncRead, +> { data_provider: DP, finalization_handler: FH, unit_saver: US, @@ -116,7 +121,7 @@ pub struct LocalIO, FH: FinalizationHandler, US: _phantom: PhantomData, } -impl, FH: FinalizationHandler, US: Write, UL: Read> +impl, FH: FinalizationHandler, US: AsyncWrite, UL: AsyncRead> LocalIO { pub fn new( @@ -573,8 +578,8 @@ pub async fn run_session< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: Write + Send + Sync + 'static, - UL: Read + Send + Sync + 'static, + US: AsyncWrite + Send + Sync + 'static, + UL: AsyncRead + Send + Sync + 'static, N: Network> + 'static, SH: SpawnHandle, MK: MultiKeychain, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 19eed02d..5e262ef5 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -11,21 +11,15 @@ use crate::{ Terminator, UncheckedSigned, }; use aleph_bft_types::Recipient; +use futures::AsyncWrite; use futures::{ channel::{mpsc, oneshot}, - pin_mut, Future, FutureExt, StreamExt, + pin_mut, AsyncRead, Future, FutureExt, StreamExt, }; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; -use std::{ - collections::HashSet, - convert::TryFrom, - fmt, - io::{Read, Write}, - marker::PhantomData, - time::Duration, -}; +use std::{collections::HashSet, convert::TryFrom, fmt, marker::PhantomData, time::Duration}; mod collection; mod packer; @@ -871,8 +865,8 @@ pub struct RunwayIO< H: Hasher, D: Data, MK: MultiKeychain, - W: Write + Send + Sync + 'static, - R: Read + Send + Sync + 'static, + W: AsyncWrite + Send + Sync + 'static, + R: AsyncRead + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > { @@ -887,8 +881,8 @@ impl< H: Hasher, D: Data, MK: MultiKeychain, - W: Write + Send + Sync + 'static, - R: Read + Send + Sync + 'static, + W: AsyncWrite + Send + Sync + 'static, + R: AsyncRead + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > RunwayIO @@ -919,8 +913,8 @@ pub(crate) async fn run( ) where H: Hasher, D: Data, - US: Write + Send + Sync + 'static, - UL: Read + Send + Sync + 'static, + US: AsyncWrite + Send + Sync + 'static, + UL: AsyncRead + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, MK: MultiKeychain, diff --git a/examples/ordering/Cargo.toml b/examples/ordering/Cargo.toml index ab01fd30..a810f3e4 100644 --- a/examples/ordering/Cargo.toml +++ b/examples/ordering/Cargo.toml @@ -18,4 +18,5 @@ futures = "0.3" log = "0.4" parking_lot = "0.12" time = { version = "0.3", features = ["formatting", "macros", "local-offset"] } -tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time"] } +tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time", "fs"] } +tokio-util = { version = "0.7.10", features = ["compat"] } diff --git a/examples/ordering/src/main.rs b/examples/ordering/src/main.rs index 49322311..0cab2992 100644 --- a/examples/ordering/src/main.rs +++ b/examples/ordering/src/main.rs @@ -1,3 +1,4 @@ +use std::io::Write; mod dataio; mod network; @@ -5,11 +6,13 @@ use aleph_bft::{run_session, NodeIndex, Terminator}; use aleph_bft_mock::{Keychain, Spawner}; use clap::Parser; use dataio::{Data, DataProvider, FinalizationHandler}; -use futures::{channel::oneshot, StreamExt}; +use futures::{channel::oneshot, io, StreamExt}; use log::{debug, error, info}; use network::Network; -use std::{collections::HashMap, fs, fs::File, io, io::Write, path::Path, time::Duration}; +use std::{collections::HashMap, path::Path, time::Duration}; use time::{macros::format_description, OffsetDateTime}; +use tokio::fs::{self, File}; +use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt}; /// Example node producing linear order. #[derive(Parser, Debug)] @@ -40,20 +43,23 @@ struct Args { crash: bool, } -fn create_backup(node_id: NodeIndex) -> Result<(File, io::Cursor>), io::Error> { +async fn create_backup( + node_id: NodeIndex, +) -> Result<(Compat, io::Cursor>), io::Error> { let stash_path = Path::new("./aleph-bft-examples-ordering-backup"); - fs::create_dir_all(stash_path)?; + fs::create_dir_all(stash_path).await?; let file_path = stash_path.join(format!("{}.units", node_id.0)); let loader = if file_path.exists() { - io::Cursor::new(fs::read(&file_path)?) + io::Cursor::new(fs::read(&file_path).await?) } else { io::Cursor::new(Vec::new()) }; let saver = fs::OpenOptions::new() .create(true) .append(true) - .open(file_path)?; - Ok((saver, loader)) + .open(file_path) + .await?; + Ok((saver.compat_write(), loader)) } fn finalized_counts(cf: &HashMap) -> Vec { @@ -104,7 +110,9 @@ async fn main() { let n_members = ports.len().into(); let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled); let (finalization_handler, mut finalized_rx) = FinalizationHandler::new(); - let (backup_saver, backup_loader) = create_backup(id).expect("Error setting up unit saving"); + let (backup_saver, backup_loader) = create_backup(id) + .await + .expect("Error setting up unit saving"); let local_io = aleph_bft::LocalIO::new( data_provider, finalization_handler, diff --git a/mock/Cargo.toml b/mock/Cargo.toml index 73a2a367..cbea9954 100644 --- a/mock/Cargo.toml +++ b/mock/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-mock" -version = "0.11.1" +version = "0.12.0" edition = "2021" authors = ["Cardinal Cryptography"] documentation = "https://docs.rs/?" diff --git a/mock/src/dataio.rs b/mock/src/dataio.rs index a9db89cc..0fdb4edc 100644 --- a/mock/src/dataio.rs +++ b/mock/src/dataio.rs @@ -3,12 +3,14 @@ use aleph_bft_types::{ }; use async_trait::async_trait; use codec::{Decode, Encode}; -use futures::{channel::mpsc::unbounded, future::pending}; +use futures::{channel::mpsc::unbounded, future::pending, AsyncWrite}; use log::error; use parking_lot::Mutex; use std::{ - io::{Cursor, Write}, + io::{self}, + pin::Pin, sync::Arc, + task::{self, Poll}, }; type Receiver = futures::channel::mpsc::UnboundedReceiver; @@ -101,20 +103,29 @@ impl Saver { } } -impl From>>> for Saver { - fn from(data: Arc>>) -> Self { - Self { data } +impl AsyncWrite for Saver { + fn poll_write( + self: Pin<&mut Self>, + _: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.data.lock().extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) } -} -impl Write for Saver { - fn write(&mut self, buf: &[u8]) -> Result { - self.data.lock().extend_from_slice(buf); - Ok(buf.len()) + fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn flush(&mut self) -> Result<(), std::io::Error> { - Ok(()) + + fn poll_close(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl From>>> for Saver { + fn from(data: Arc>>) -> Self { + Self { data } } } -pub type Loader = Cursor>; +pub type Loader = futures::io::Cursor>; diff --git a/rmc/Cargo.toml b/rmc/Cargo.toml index 70361d2a..9bbdbfee 100644 --- a/rmc/Cargo.toml +++ b/rmc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-rmc" -version = "0.11.0" +version = "0.11.1" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "cryptography"] diff --git a/rmc/src/handler.rs b/rmc/src/handler.rs index 31c8e477..cd6eb32d 100644 --- a/rmc/src/handler.rs +++ b/rmc/src/handler.rs @@ -1,7 +1,7 @@ //! Reliable MultiCast - a primitive for Reliable Broadcast protocol. pub use aleph_bft_crypto::{ - Indexed, MultiKeychain, Multisigned, NodeCount, PartialMultisignature, PartiallyMultisigned, - Signable, Signature, Signed, UncheckedSigned, + Indexed, MultiKeychain, Multisigned, PartialMultisignature, PartiallyMultisigned, Signable, + Signed, UncheckedSigned, }; use core::fmt::Debug; use std::{ diff --git a/rmc/src/scheduler.rs b/rmc/src/scheduler.rs index 6f5ae61b..4a3ea4ea 100644 --- a/rmc/src/scheduler.rs +++ b/rmc/src/scheduler.rs @@ -1,7 +1,3 @@ -pub use aleph_bft_crypto::{ - Indexed, MultiKeychain, Multisigned, NodeCount, PartialMultisignature, PartiallyMultisigned, - Signable, Signature, Signed, UncheckedSigned, -}; use async_trait::async_trait; use core::fmt::Debug; use futures::future::pending; diff --git a/rmc/src/service.rs b/rmc/src/service.rs index 3f7aa532..0dc60338 100644 --- a/rmc/src/service.rs +++ b/rmc/src/service.rs @@ -4,10 +4,7 @@ use crate::{ scheduler::TaskScheduler, Message, }; -pub use aleph_bft_crypto::{ - Indexed, MultiKeychain, Multisigned, NodeCount, PartialMultisignature, PartiallyMultisigned, - Signable, Signature, Signed, UncheckedSigned, -}; +pub use aleph_bft_crypto::{MultiKeychain, Multisigned, Signable}; use core::fmt::Debug; use log::{debug, warn}; use std::hash::Hash;