Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow async implementation for BackupReader and BackupWriter #1

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};

use async_trait::async_trait;
use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use log::{error, info, warn};
Expand Down Expand Up @@ -63,14 +64,29 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: Read> {
#[async_trait]
pub trait BackupReader {
/// Read the entire backup.
async fn read(&mut self) -> std::io::Result<Vec<u8>>;
}

#[async_trait]
impl<R: Read + Send> BackupReader for R {
async fn read(&mut self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
self.read_to_end(&mut buf)?;
Ok(buf)
}
Comment on lines +75 to +79

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this impl is still blocking an async task afaik.

}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: BackupReader> {
backup: R,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: BackupReader> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup,
Expand All @@ -80,9 +96,8 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
}
}

fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf)?;
async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let buf = self.backup.read().await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down Expand Up @@ -163,7 +178,7 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
starting_round: oneshot::Sender<Option<Round>>,
next_round_collection: oneshot::Receiver<Round>,
) {
let units = match self.load() {
let units = match self.load().await {
Ok(items) => items,
Err(e) => {
error!(target: LOG_TARGET, "unable to load backup data: {}", e);
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/backup/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use loader::BackupLoader;
pub use loader::BackupReader;
pub use saver::BackupSaver;
pub use saver::BackupWriter;

mod loader;
mod saver;
31 changes: 24 additions & 7 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
use std::io::Write;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use async_trait::async_trait;
use codec::Encode;
use futures::{FutureExt, StreamExt};
use log::{debug, error};

const LOG_TARGET: &str = "AlephBFT-backup-saver";

#[async_trait]
/// Write backups to peristent storage.
pub trait BackupWriter {
/// Append new data to the backup.
async fn append(&mut self, data: &[u8]) -> std::io::Result<()>;
}

#[async_trait]
impl<W: Write + Send> BackupWriter for W {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
Write::write_all(self, data)?;
self.flush()
}
Comment on lines +20 to +23

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this have to be in some wrapper that tells the async executor that it should be run on a thread that is allowed to block? Otherwise we only fix the problem in case downstream users don't use a writer but implement BackupWriter themselves. Maybe that's the best way to go anyway and we should just remove this default impl?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so to avoid bigger changes we can keep it this way if it's only used for testing anyway.

}

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: Write> {
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: BackupWriter> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
backup: W,
}

impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, S: Signature, W: BackupWriter> BackupSaver<H, D, S, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
Expand All @@ -29,10 +45,11 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
}
}

pub fn save_item(&mut self, item: &UncheckedSignedUnit<H, D, S>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.append(&item.encode()).await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand All @@ -47,7 +64,7 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item) {
if let Err(e) = self.save_item(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use aleph_bft_types::{
PartialMultisignature, PartiallyMultisigned, Recipient, Round, SessionId, Signable, Signature,
SignatureError, SignatureSet, Signed, SpawnHandle, TaskHandle, UncheckedSigned,
};
pub use backup::{BackupReader, BackupWriter};
pub use config::{
create_config, default_config, default_delay_config, exponential_slowdown, Config, DelayConfig,
};
Expand Down
26 changes: 19 additions & 7 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::{
},
task_queue::TaskQueue,
units::{UncheckedSignedUnit, UnitCoord},
Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex,
Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher,
MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle,
Terminator, UncheckedSigned,
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
Expand Down Expand Up @@ -108,16 +109,27 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read> {
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: BackupWriter,
UL: BackupReader,
> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read>
LocalIO<D, DP, FH, US, UL>
impl<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: BackupWriter,
UL: BackupReader,
> LocalIO<D, DP, FH, US, UL>
{
pub fn new(
data_provider: DP,
Expand Down Expand Up @@ -573,8 +585,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
18 changes: 9 additions & 9 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::{
ControlHash, PreUnit, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore,
UnitStoreStatus, Validator,
},
Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain,
NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle,
Terminator, UncheckedSigned,
BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher, Index,
Keychain, MultiKeychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature,
Signed, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::Recipient;
use futures::{
Expand Down Expand Up @@ -871,8 +871,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -887,8 +887,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -919,8 +919,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down