Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4006: Unit reconstruction refactor #418

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 75 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.34.0"
version = "0.34.1"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
52 changes: 23 additions & 29 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use futures::{
future::pending,
FutureExt,
};
use log::{debug, error, warn};
use log::{debug, error};

use crate::{
config::Config,
creation,
extension::Service as Extender,
handle_task_termination,
reconstruction::Service as ReconstructionService,
runway::{NotificationIn, NotificationOut},
terminal::Terminal,
Hasher, Receiver, Round, Sender, SpawnHandle, Terminator,
};

Expand All @@ -37,12 +37,12 @@ pub(crate) async fn run<H: Hasher + 'static>(
})
.fuse();

let (parents_for_creator, parents_from_terminal) = mpsc::unbounded();
let (parents_for_creator, parents_from_dag) = mpsc::unbounded();

let creator_terminator = terminator.add_offspring_connection("creator");
let io = creation::IO {
outgoing_units: outgoing_notifications.clone(),
incoming_parents: parents_from_terminal,
incoming_parents: parents_from_dag,
};
let creator_handle = spawn_handle
.spawn_essential(
Expand All @@ -58,37 +58,25 @@ pub(crate) async fn run<H: Hasher + 'static>(
pending().await
};

let mut terminal = Terminal::new(index, incoming_notifications, outgoing_notifications);
let reconstruction = ReconstructionService::new(
incoming_notifications,
outgoing_notifications,
parents_for_creator,
electors_tx,
);

// send a new parent candidate to the creator
let mut parents_for_creator = Some(parents_for_creator);
terminal.register_post_insert_hook(Box::new(move |u| {
if let Some(parents_for_creator_tx) = &parents_for_creator {
if parents_for_creator_tx.unbounded_send(u.into()).is_err() {
warn!(target: "AlephBFT", "Channel to creator was closed.");
parents_for_creator = None;
}
}
}));
// try to extend the partial order after adding a unit to the dag
terminal.register_post_insert_hook(Box::new(move |u| {
electors_tx
.unbounded_send(u.into())
.expect("Channel to extender should be open.")
}));

let terminal_terminator = terminator.add_offspring_connection("terminal");
let mut terminal_handle = spawn_handle
.spawn_essential("consensus/terminal", async move {
terminal.run(terminal_terminator).await
let reconstruction_terminator = terminator.add_offspring_connection("reconstruction");
let mut reconstruction_handle = spawn_handle
.spawn_essential("consensus/reconstruction", async move {
reconstruction.run(reconstruction_terminator).await
})
.fuse();
debug!(target: "AlephBFT", "{:?} All services started.", index);

futures::select! {
_ = terminator.get_exit().fuse() => {},
_ = terminal_handle => {
debug!(target: "AlephBFT-consensus", "{:?} terminal task terminated early.", index);
_ = reconstruction_handle => {
debug!(target: "AlephBFT-consensus", "{:?} reconstruction task terminated early.", index);
fixxxedpoint marked this conversation as resolved.
Show resolved Hide resolved
},
_ = creator_panic_handle.fuse() => {
error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index);
Expand All @@ -102,7 +90,13 @@ pub(crate) async fn run<H: Hasher + 'static>(
// we stop no matter if received Ok or Err
terminator.terminate_sync().await;

handle_task_termination(terminal_handle, "AlephBFT-consensus", "Terminal", index).await;
handle_task_termination(
reconstruction_handle,
"AlephBFT-consensus",
"Reconstruction",
index,
)
.await;
handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await;
handle_task_termination(extender_handle, "AlephBFT-consensus", "Extender", index).await;

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod creation;
mod extension;
mod member;
mod network;
mod reconstruction;
mod runway;
mod terminal;
mod terminator;
mod units;

Expand Down
229 changes: 229 additions & 0 deletions consensus/src/reconstruction/dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use crate::{reconstruction::ReconstructedUnit, Hasher};
use std::collections::{HashMap, HashSet};

struct OrphanedUnit<H: Hasher> {
unit: ReconstructedUnit<H>,
missing_parents: HashSet<H::Hash>,
}

impl<H: Hasher> OrphanedUnit<H> {
/// If there are no missing parents then returns just the internal unit.
pub fn new(
unit: ReconstructedUnit<H>,
missing_parents: HashSet<H::Hash>,
) -> Result<Self, ReconstructedUnit<H>> {
match missing_parents.is_empty() {
true => Err(unit),
false => Ok(OrphanedUnit {
unit,
missing_parents,
}),
}
}

/// If this was the last missing parent return the reconstructed unit.
pub fn resolve_parent(self, parent: H::Hash) -> Result<ReconstructedUnit<H>, Self> {
let OrphanedUnit {
unit,
mut missing_parents,
} = self;
missing_parents.remove(&parent);
match missing_parents.is_empty() {
true => Ok(unit),
false => Err(OrphanedUnit {
unit,
missing_parents,
}),
}
}

/// The hash of the unit.
pub fn hash(&self) -> H::Hash {
self.unit.hash()
}

/// The set of still missing parents.
pub fn missing_parents(&self) -> &HashSet<H::Hash> {
&self.missing_parents
}
}

/// A structure ensuring that units added to it are output in an order
/// in agreement with the DAG order.
/// TODO: This should likely be the final destination of units going through a pipeline.
/// This requires quite a bit more of a refactor.
pub struct Dag<H: Hasher> {
orphaned_units: HashMap<H::Hash, OrphanedUnit<H>>,
waiting_for: HashMap<H::Hash, Vec<H::Hash>>,
dag_units: HashMap<H::Hash, ReconstructedUnit<H>>,
}

impl<H: Hasher> Dag<H> {
/// Create a new empty DAG.
pub fn new() -> Self {
Dag {
orphaned_units: HashMap::new(),
waiting_for: HashMap::new(),
dag_units: HashMap::new(),
}
}

fn move_to_dag(&mut self, unit: ReconstructedUnit<H>) -> Vec<ReconstructedUnit<H>> {
let unit_hash = unit.hash();
self.dag_units.insert(unit_hash, unit.clone());
// Start with this unit, so that the units are in order at the end.
let mut result = vec![unit];
for child in self.waiting_for.remove(&unit_hash).iter().flatten() {
match self
.orphaned_units
.remove(child)
.expect("we were waiting for parents")
.resolve_parent(unit_hash)
{
Ok(unit) => result.append(&mut self.move_to_dag(unit)),
fixxxedpoint marked this conversation as resolved.
Show resolved Hide resolved
Err(orphan) => {
self.orphaned_units.insert(*child, orphan);
}
}
}
result
}

/// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag,
/// in an order agreeing with the Dag structure.
pub fn add_unit(&mut self, unit: ReconstructedUnit<H>) -> Vec<ReconstructedUnit<H>> {
if self.dag_units.contains_key(&unit.hash()) {
// Deduplicate.
return Vec::new();
}
let missing_parents = unit
.parents()
.values()
.filter(|parent| !self.dag_units.contains_key(parent))
.cloned()
.collect();
match OrphanedUnit::new(unit, missing_parents) {
Ok(orphan) => {
let unit_hash = orphan.hash();
for parent in orphan.missing_parents() {
self.waiting_for.entry(*parent).or_default().push(unit_hash);
}
self.orphaned_units.insert(unit_hash, orphan);
Vec::new()
}
Err(unit) => self.move_to_dag(unit),
}
}
}

#[cfg(test)]
mod test {
use crate::{
reconstruction::{dag::Dag, ReconstructedUnit},
units::tests::{random_full_parent_units_up_to, TestFullUnit},
Hasher, NodeCount, NodeIndex, NodeMap,
};
use aleph_bft_mock::Hasher64;
use std::collections::HashSet;

fn full_parents_to_map(
parents: Vec<<Hasher64 as Hasher>::Hash>,
) -> NodeMap<<Hasher64 as Hasher>::Hash> {
let mut result = NodeMap::with_size(NodeCount(parents.len()));
for (id, parent) in parents.into_iter().enumerate() {
result.insert(NodeIndex(id), parent);
}
result
}

// silly clippy, the map below doesn't work with &[..]
#[allow(clippy::ptr_arg)]
fn unit_hashes(units: &Vec<TestFullUnit>) -> Vec<<Hasher64 as Hasher>::Hash> {
units.iter().map(|unit| unit.hash()).collect()
}

fn reconstructed(dag: Vec<Vec<TestFullUnit>>) -> Vec<Vec<ReconstructedUnit<Hasher64>>> {
let hashes: Vec<_> = dag.iter().map(unit_hashes).collect();
let initial_units: Vec<_> = dag
.get(0)
.expect("only called on nonempty dags")
.iter()
.map(|unit| ReconstructedUnit::initial(unit.unit()))
.collect();
let mut result = vec![initial_units];
for (units, parents) in dag.iter().skip(1).zip(hashes) {
let parents = full_parents_to_map(parents);
let reconstructed = units
.iter()
.map(|unit| {
ReconstructedUnit::with_parents(unit.unit(), parents.clone())
.expect("parents are correct")
})
.collect();
result.push(reconstructed);
}
result
}

#[test]
fn reconstructs_initial_units() {
let mut dag = Dag::new();
for unit in reconstructed(random_full_parent_units_up_to(0, NodeCount(4)))
.pop()
.expect("we have initial units")
{
let reconstructed = dag.add_unit(unit.clone());
assert_eq!(reconstructed, vec![unit]);
}
}

#[test]
fn reconstructs_units_in_order() {
let mut dag = Dag::new();
for units in reconstructed(random_full_parent_units_up_to(7, NodeCount(4))) {
for unit in units {
let reconstructed = dag.add_unit(unit.clone());
assert_eq!(reconstructed, vec![unit]);
}
}
}

#[test]
fn reconstructs_units_in_reverse_order() {
let full_unit_dag = random_full_parent_units_up_to(7, NodeCount(4));
let mut hash_batches: Vec<_> = full_unit_dag
.iter()
.map(unit_hashes)
.map(HashSet::from_iter)
.collect();
hash_batches.reverse();
let mut unit_dag = reconstructed(full_unit_dag);
unit_dag.reverse();
let mut initial_units = unit_dag.pop().expect("initial units are there");
let mut dag = Dag::new();
for units in unit_dag {
for unit in units {
let reconstructed = dag.add_unit(unit.clone());
assert!(reconstructed.is_empty());
}
}
let last_initial_unit = initial_units.pop().expect("there is an initial unit");
for unit in initial_units {
let reconstructed = dag.add_unit(unit.clone());
let mut current_round_hashes: HashSet<_> = hash_batches.pop().expect("we are not done");
assert!(current_round_hashes.remove(&unit.hash()));
if !current_round_hashes.is_empty() {
hash_batches.push(current_round_hashes);
}
assert_eq!(reconstructed, vec![unit]);
}
for unit in dag.add_unit(last_initial_unit) {
let mut current_round_hashes = hash_batches.pop().expect("we are not done");
assert!(current_round_hashes.remove(&unit.hash()));
if !current_round_hashes.is_empty() {
hash_batches.push(current_round_hashes);
}
}
assert!(hash_batches.is_empty());
}
}
Loading
Loading