Skip to content

Commit

Permalink
Unit saving pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Apr 9, 2024
1 parent 82151d8 commit 0930a7d
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.36"
aleph-bft = "^0.37"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.3"
version = "0.37.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
23 changes: 17 additions & 6 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{error, info, warn};

use crate::{
units::{UncheckedSignedUnit, Unit, UnitCoord},
Data, Hasher, NodeIndex, Round, SessionId, Signature,
Data, Hasher, NodeIndex, NodeMap, Round, SessionId, Signature,
};

const LOG_TARGET: &str = "AlephBFT-backup-loader";
Expand Down Expand Up @@ -91,7 +91,8 @@ impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
result.push(<UncheckedSignedUnit<H, D, S>>::decode(input)?);
// TODO(A0-4234): We should use the parents information to respond to reconstruction failures.
result.push(<(UncheckedSignedUnit<H, D, S>, NodeMap<H::Hash>)>::decode(input)?.0);
}
Ok(result)
}
Expand Down Expand Up @@ -241,15 +242,15 @@ mod tests {
use codec::Encode;
use futures::channel::oneshot;

use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature};
use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, Loader, Signature};

use crate::{
backup::BackupLoader,
units::{
create_preunits, creator_set, preunit_to_full_unit, preunit_to_unchecked_signed_unit,
UncheckedSignedUnit as GenericUncheckedSignedUnit,
UncheckedSignedUnit as GenericUncheckedSignedUnit, Unit,
},
NodeCount, NodeIndex, Round, SessionId,
NodeCount, NodeIndex, NodeMap, Round, SessionId,
};

type UncheckedSignedUnit = GenericUncheckedSignedUnit<Hasher64, Data, Signature>;
Expand Down Expand Up @@ -308,7 +309,17 @@ mod tests {
}

fn encode_all(items: Vec<UncheckedSignedUnit>) -> Vec<Vec<u8>> {
items.iter().map(|u| u.encode()).collect()
items
.iter()
.map(|u| {
(
u,
// for now encode empty parents as we ignore them anyway
NodeMap::<Hash64>::with_size(u.as_signable().control_hash().n_members()),
)
.encode()
})
.collect()
}

fn prepare_test(encoded_items: Vec<u8>) -> PrepareTestResponse<impl futures::Future> {
Expand Down
63 changes: 35 additions & 28 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};
Expand All @@ -10,30 +14,29 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver";
/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, S, W> {
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
}
}

pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let parents = unit.parents().clone();
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
self.backup.write_all(&(unit, parents).encode()).await?;
self.backup.flush().await
}

Expand All @@ -49,7 +52,7 @@ impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item).await {
if let Err(e) = self.save_unit(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down Expand Up @@ -80,16 +83,17 @@ mod tests {
StreamExt,
};

use aleph_bft_mock::{Data, Hasher64, Keychain, Saver, Signature};
use aleph_bft_mock::{Data, Hasher64, Keychain, Saver};

use crate::{
backup::BackupSaver,
units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit},
NodeCount, NodeIndex, Terminator,
dag::ReconstructedUnit,
units::{creator_set, preunit_to_signed_unit, TestingSignedUnit},
NodeCount, Terminator,
};

type TestBackupSaver = BackupSaver<Hasher64, Data, Signature, Saver>;
type TestUnit = UncheckedSignedUnit<Hasher64, Data, Signature>;
type TestUnit = ReconstructedUnit<TestingSignedUnit>;
type TestBackupSaver = BackupSaver<Hasher64, Data, Keychain, Saver>;
struct PrepareSaverResponse<F: futures::Future> {
task: F,
units_for_saver: mpsc::UnboundedSender<TestUnit>,
Expand Down Expand Up @@ -122,6 +126,7 @@ mod tests {

#[tokio::test]
async fn test_proper_relative_responses_ordering() {
let node_count = NodeCount(5);
let PrepareSaverResponse {
task,
units_for_saver,
Expand All @@ -133,17 +138,19 @@ mod tests {
task.await;
});

let creators = creator_set(NodeCount(5));
let keychains: Vec<_> = (0..5)
.map(|id| Keychain::new(NodeCount(5), NodeIndex(id)))
let creators = creator_set(node_count);
let keychains: Vec<_> = node_count
.into_iterator()
.map(|id| Keychain::new(node_count, id))
.collect();
let units: Vec<TestUnit> = (0..5)
.map(|k| {
preunit_to_unchecked_signed_unit(
creators[k].create_unit(0).unwrap(),
let units: Vec<TestUnit> = node_count
.into_iterator()
.map(|id| {
ReconstructedUnit::initial(preunit_to_signed_unit(
creators[id.0].create_unit(0).unwrap(),
0,
&keychains[k],
)
&keychains[id.0],
))
})
.collect();

Expand Down
24 changes: 10 additions & 14 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use validation::{Error as ValidationError, Validator};

const LOG_TARGET: &str = "AlephBFT-dag";

pub type DagUnit<H, D, MK> = ReconstructedUnit<SignedUnit<H, D, MK>>;

/// The result of sending some information to the Dag.
pub struct DagResult<H: Hasher, D: Data, MK: MultiKeychain> {
/// Units added to the dag.
pub units: Vec<ReconstructedUnit<SignedUnit<H, D, MK>>>,
pub units: Vec<DagUnit<H, D, MK>>,
/// Requests for more information.
pub requests: Vec<Request<H>>,
/// Alerts raised due to encountered forks.
Expand Down Expand Up @@ -114,25 +116,16 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}

fn handle_result(&mut self, result: &DagResult<H, D, MK>) {
// just clean the validator cache of units that we are returning
for unit in &result.units {
self.validator.finished_processing(&unit.hash());
}
}

/// Add a unit to the Dag.
pub fn add_unit<U: WrappedUnit<H, Wrapped = SignedUnit<H, D, MK>>>(
&mut self,
unit: UncheckedSignedUnit<H, D, MK::Signature>,
store: &UnitStore<U>,
) -> DagResult<H, D, MK> {
let result = match self.validator.validate(unit, store) {
match self.validator.validate(unit, store) {
Ok(unit) => self.reconstruction.add_unit(unit).into(),
Err(e) => Self::handle_validation_error(e),
};
self.handle_result(&result);
result
}
}

/// Add parents of a unit to the Dag.
Expand Down Expand Up @@ -180,7 +173,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
.add_parents(unit_hash, parent_hashes)
.into(),
);
self.handle_result(&result);
result
}

Expand Down Expand Up @@ -208,10 +200,14 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}
}
self.handle_result(&result);
result
}

/// Notify the dag that a unit has finished processing and can be cleared from the cache.
pub fn finished_processing(&mut self, hash: &H::Hash) {
self.validator.finished_processing(hash);
}

pub fn status(&self) -> DagStatus {
self.validator.status()
}
Expand Down
41 changes: 20 additions & 21 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alerts::{Alert, ForkingNotification, NetworkMessage},
creation,
dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest},
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
extension::{ExtenderUnit, Service as Extender},
handle_task_termination,
member::UnitMessage,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<ReconstructedUnit<SignedUnit<H, D, MK>>>,
store: UnitStore<DagUnit<H, D, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
Expand All @@ -118,12 +118,12 @@ where
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
exiting: bool,
}
Expand Down Expand Up @@ -210,15 +210,15 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {

struct RunwayConfig<H: Hasher, D: Data, FH: FinalizationHandler<D>, MK: MultiKeychain> {
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
resolved_requests: Sender<Request<H>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
Expand Down Expand Up @@ -455,10 +455,18 @@ where
}
}

fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit<SignedUnit<H, D, MK>>) {
fn on_unit_reconstructed(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord());
if self.backup_units_for_saver.unbounded_send(unit).is_err() {
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
self.resolve_missing_parents(&unit_hash);
self.resolve_missing_coord(&unit.coord());
if self
Expand All @@ -477,21 +485,12 @@ where
warn!(target: "AlephBFT-runway", "Creator channel should be open.");
self.exiting = true;
}
if self
.backup_units_for_saver
.unbounded_send(unit.unpack().into())
.is_err()
{
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: UncheckedSignedUnit<H, D, MK::Signature>) {
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone()));
let unit = unit.unpack();
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into()));

if unit.as_signable().creator() == self.index() {
trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash());
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit));
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into()));
}
}

Expand Down
11 changes: 8 additions & 3 deletions consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender},
units::{UncheckedSignedUnit, Unit, UnitCoord},
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
NodeCount, NodeIndex, NodeMap, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner};
use aleph_bft_mock::{Data, Hash64, Hasher64, Router, Signature, Spawner};
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -129,7 +129,12 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet<UnitCoord> {
let mut already_saved = HashSet::new();

while !buf.is_empty() {
let unit = UncheckedSignedUnit::<Hasher64, Data, Signature>::decode(buf).unwrap();
let unit = <(
UncheckedSignedUnit<Hasher64, Data, Signature>,
NodeMap<Hash64>,
)>::decode(buf)
.unwrap()
.0;
let full_unit = unit.as_signable();
let coord = full_unit.coord();
let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask;
Expand Down
Loading

0 comments on commit 0930a7d

Please sign in to comment.