Skip to content

Commit

Permalink
A0-4006: Unit reconstruction refactor (#418)
Browse files Browse the repository at this point in the history
* Unit reconstruction refactor

* Remove recurrency and other review fixes

* Ofc forgot fmt...

* Exit strategy

* Remove redundant units
  • Loading branch information
timorleph authored Mar 11, 2024
1 parent 36638fd commit ad8a26c
Show file tree
Hide file tree
Showing 11 changed files with 1,081 additions and 551 deletions.
153 changes: 75 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.34.0"
version = "0.34.1"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
52 changes: 23 additions & 29 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use futures::{
future::pending,
FutureExt,
};
use log::{debug, error, warn};
use log::{debug, error};

use crate::{
config::Config,
creation,
extension::Service as Extender,
handle_task_termination,
reconstruction::Service as ReconstructionService,
runway::{NotificationIn, NotificationOut},
terminal::Terminal,
Hasher, Receiver, Round, Sender, SpawnHandle, Terminator,
};

Expand All @@ -37,12 +37,12 @@ pub(crate) async fn run<H: Hasher + 'static>(
})
.fuse();

let (parents_for_creator, parents_from_terminal) = mpsc::unbounded();
let (parents_for_creator, parents_from_dag) = mpsc::unbounded();

let creator_terminator = terminator.add_offspring_connection("creator");
let io = creation::IO {
outgoing_units: outgoing_notifications.clone(),
incoming_parents: parents_from_terminal,
incoming_parents: parents_from_dag,
};
let creator_handle = spawn_handle
.spawn_essential(
Expand All @@ -58,37 +58,25 @@ pub(crate) async fn run<H: Hasher + 'static>(
pending().await
};

let mut terminal = Terminal::new(index, incoming_notifications, outgoing_notifications);
let reconstruction = ReconstructionService::new(
incoming_notifications,
outgoing_notifications,
parents_for_creator,
electors_tx,
);

// send a new parent candidate to the creator
let mut parents_for_creator = Some(parents_for_creator);
terminal.register_post_insert_hook(Box::new(move |u| {
if let Some(parents_for_creator_tx) = &parents_for_creator {
if parents_for_creator_tx.unbounded_send(u.into()).is_err() {
warn!(target: "AlephBFT", "Channel to creator was closed.");
parents_for_creator = None;
}
}
}));
// try to extend the partial order after adding a unit to the dag
terminal.register_post_insert_hook(Box::new(move |u| {
electors_tx
.unbounded_send(u.into())
.expect("Channel to extender should be open.")
}));

let terminal_terminator = terminator.add_offspring_connection("terminal");
let mut terminal_handle = spawn_handle
.spawn_essential("consensus/terminal", async move {
terminal.run(terminal_terminator).await
let reconstruction_terminator = terminator.add_offspring_connection("reconstruction");
let mut reconstruction_handle = spawn_handle
.spawn_essential("consensus/reconstruction", async move {
reconstruction.run(reconstruction_terminator).await
})
.fuse();
debug!(target: "AlephBFT", "{:?} All services started.", index);

futures::select! {
_ = terminator.get_exit().fuse() => {},
_ = terminal_handle => {
debug!(target: "AlephBFT-consensus", "{:?} terminal task terminated early.", index);
_ = reconstruction_handle => {
debug!(target: "AlephBFT-consensus", "{:?} unit reconstruction task terminated early.", index);
},
_ = creator_panic_handle.fuse() => {
error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index);
Expand All @@ -102,7 +90,13 @@ pub(crate) async fn run<H: Hasher + 'static>(
// we stop no matter if received Ok or Err
terminator.terminate_sync().await;

handle_task_termination(terminal_handle, "AlephBFT-consensus", "Terminal", index).await;
handle_task_termination(
reconstruction_handle,
"AlephBFT-consensus",
"Reconstruction",
index,
)
.await;
handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await;
handle_task_termination(extender_handle, "AlephBFT-consensus", "Extender", index).await;

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod creation;
mod extension;
mod member;
mod network;
mod reconstruction;
mod runway;
mod terminal;
mod terminator;
mod units;

Expand Down
232 changes: 232 additions & 0 deletions consensus/src/reconstruction/dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
use crate::{reconstruction::ReconstructedUnit, Hasher};
use std::collections::{HashMap, HashSet, VecDeque};

struct OrphanedUnit<H: Hasher> {
unit: ReconstructedUnit<H>,
missing_parents: HashSet<H::Hash>,
}

impl<H: Hasher> OrphanedUnit<H> {
/// If there are no missing parents then returns just the internal unit.
pub fn new(
unit: ReconstructedUnit<H>,
missing_parents: HashSet<H::Hash>,
) -> Result<Self, ReconstructedUnit<H>> {
match missing_parents.is_empty() {
true => Err(unit),
false => Ok(OrphanedUnit {
unit,
missing_parents,
}),
}
}

/// If this was the last missing parent return the reconstructed unit.
pub fn resolve_parent(self, parent: H::Hash) -> Result<ReconstructedUnit<H>, Self> {
let OrphanedUnit {
unit,
mut missing_parents,
} = self;
missing_parents.remove(&parent);
match missing_parents.is_empty() {
true => Ok(unit),
false => Err(OrphanedUnit {
unit,
missing_parents,
}),
}
}

/// The hash of the unit.
pub fn hash(&self) -> H::Hash {
self.unit.hash()
}

/// The set of still missing parents.
pub fn missing_parents(&self) -> &HashSet<H::Hash> {
&self.missing_parents
}
}

/// A structure ensuring that units added to it are output in an order
/// in agreement with the DAG order.
/// TODO: This should likely be the final destination of units going through a pipeline.
/// This requires quite a bit more of a refactor.
pub struct Dag<H: Hasher> {
orphaned_units: HashMap<H::Hash, OrphanedUnit<H>>,
waiting_for: HashMap<H::Hash, Vec<H::Hash>>,
dag_units: HashSet<H::Hash>,
}

impl<H: Hasher> Dag<H> {
/// Create a new empty DAG.
pub fn new() -> Self {
Dag {
orphaned_units: HashMap::new(),
waiting_for: HashMap::new(),
dag_units: HashSet::new(),
}
}

fn move_to_dag(&mut self, unit: ReconstructedUnit<H>) -> Vec<ReconstructedUnit<H>> {
let mut result = Vec::new();
let mut ready_units = VecDeque::from([unit]);
while let Some(unit) = ready_units.pop_front() {
let unit_hash = unit.hash();
self.dag_units.insert(unit_hash);
result.push(unit);
for child in self.waiting_for.remove(&unit_hash).iter().flatten() {
match self
.orphaned_units
.remove(child)
.expect("we were waiting for parents")
.resolve_parent(unit_hash)
{
Ok(unit) => ready_units.push_back(unit),
Err(orphan) => {
self.orphaned_units.insert(*child, orphan);
}
}
}
}
result
}

/// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag,
/// in an order agreeing with the Dag structure.
pub fn add_unit(&mut self, unit: ReconstructedUnit<H>) -> Vec<ReconstructedUnit<H>> {
if self.dag_units.contains(&unit.hash()) {
// Deduplicate.
return Vec::new();
}
let missing_parents = unit
.parents()
.values()
.filter(|parent| !self.dag_units.contains(parent))
.cloned()
.collect();
match OrphanedUnit::new(unit, missing_parents) {
Ok(orphan) => {
let unit_hash = orphan.hash();
for parent in orphan.missing_parents() {
self.waiting_for.entry(*parent).or_default().push(unit_hash);
}
self.orphaned_units.insert(unit_hash, orphan);
Vec::new()
}
Err(unit) => self.move_to_dag(unit),
}
}
}

#[cfg(test)]
mod test {
use crate::{
reconstruction::{dag::Dag, ReconstructedUnit},
units::tests::{random_full_parent_units_up_to, TestFullUnit},
Hasher, NodeCount, NodeIndex, NodeMap,
};
use aleph_bft_mock::Hasher64;
use std::collections::HashSet;

fn full_parents_to_map(
parents: Vec<<Hasher64 as Hasher>::Hash>,
) -> NodeMap<<Hasher64 as Hasher>::Hash> {
let mut result = NodeMap::with_size(NodeCount(parents.len()));
for (id, parent) in parents.into_iter().enumerate() {
result.insert(NodeIndex(id), parent);
}
result
}

// silly clippy, the map below doesn't work with &[..]
#[allow(clippy::ptr_arg)]
fn unit_hashes(units: &Vec<TestFullUnit>) -> Vec<<Hasher64 as Hasher>::Hash> {
units.iter().map(|unit| unit.hash()).collect()
}

fn reconstructed(dag: Vec<Vec<TestFullUnit>>) -> Vec<Vec<ReconstructedUnit<Hasher64>>> {
let hashes: Vec<_> = dag.iter().map(unit_hashes).collect();
let initial_units: Vec<_> = dag
.get(0)
.expect("only called on nonempty dags")
.iter()
.map(|unit| ReconstructedUnit::initial(unit.unit()))
.collect();
let mut result = vec![initial_units];
for (units, parents) in dag.iter().skip(1).zip(hashes) {
let parents = full_parents_to_map(parents);
let reconstructed = units
.iter()
.map(|unit| {
ReconstructedUnit::with_parents(unit.unit(), parents.clone())
.expect("parents are correct")
})
.collect();
result.push(reconstructed);
}
result
}

#[test]
fn reconstructs_initial_units() {
let mut dag = Dag::new();
for unit in reconstructed(random_full_parent_units_up_to(0, NodeCount(4)))
.pop()
.expect("we have initial units")
{
let reconstructed = dag.add_unit(unit.clone());
assert_eq!(reconstructed, vec![unit]);
}
}

#[test]
fn reconstructs_units_in_order() {
let mut dag = Dag::new();
for units in reconstructed(random_full_parent_units_up_to(7000, NodeCount(4))) {
for unit in units {
let reconstructed = dag.add_unit(unit.clone());
assert_eq!(reconstructed, vec![unit]);
}
}
}

#[test]
fn reconstructs_units_in_reverse_order() {
let full_unit_dag = random_full_parent_units_up_to(7000, NodeCount(4));
let mut hash_batches: Vec<_> = full_unit_dag
.iter()
.map(unit_hashes)
.map(HashSet::from_iter)
.collect();
hash_batches.reverse();
let mut unit_dag = reconstructed(full_unit_dag);
unit_dag.reverse();
let mut initial_units = unit_dag.pop().expect("initial units are there");
let mut dag = Dag::new();
for units in unit_dag {
for unit in units {
let reconstructed = dag.add_unit(unit.clone());
assert!(reconstructed.is_empty());
}
}
let last_initial_unit = initial_units.pop().expect("there is an initial unit");
for unit in initial_units {
let reconstructed = dag.add_unit(unit.clone());
let mut current_round_hashes: HashSet<_> = hash_batches.pop().expect("we are not done");
assert!(current_round_hashes.remove(&unit.hash()));
if !current_round_hashes.is_empty() {
hash_batches.push(current_round_hashes);
}
assert_eq!(reconstructed, vec![unit]);
}
for unit in dag.add_unit(last_initial_unit) {
let mut current_round_hashes = hash_batches.pop().expect("we are not done");
assert!(current_round_hashes.remove(&unit.hash()));
if !current_round_hashes.is_empty() {
hash_batches.push(current_round_hashes);
}
}
assert!(hash_batches.is_empty());
}
}
Loading

0 comments on commit ad8a26c

Please sign in to comment.