From 2fd82e696b08ea770c2568b0a094054d75e8ee2e Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 28 Feb 2024 11:49:23 +0100 Subject: [PATCH 1/5] Unit reconstruction refactor --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/consensus.rs | 52 ++- consensus/src/lib.rs | 2 +- consensus/src/reconstruction/dag.rs | 229 ++++++++++++ consensus/src/reconstruction/mod.rs | 211 +++++++++++ consensus/src/reconstruction/parents.rs | 467 ++++++++++++++++++++++++ consensus/src/terminal.rs | 397 -------------------- consensus/src/testing/dag.rs | 2 +- consensus/src/units/mod.rs | 94 +++-- docs/src/internals.md | 36 +- 11 files changed, 1020 insertions(+), 474 deletions(-) create mode 100644 consensus/src/reconstruction/dag.rs create mode 100644 consensus/src/reconstruction/mod.rs create mode 100644 consensus/src/reconstruction/parents.rs delete mode 100644 consensus/src/terminal.rs diff --git a/Cargo.lock b/Cargo.lock index 97271385..a7fcf586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.33.1" +version = "0.33.3" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 34a30857..f888c827 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.33.2" +version = "0.33.3" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 56c0c4dd..4d447bff 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -3,15 +3,15 @@ use futures::{ future::pending, FutureExt, }; -use log::{debug, error, warn}; +use log::{debug, error}; use crate::{ config::Config, creation, extension::Service as Extender, handle_task_termination, + reconstruction::Service as ReconstructionService, runway::{NotificationIn, NotificationOut}, - terminal::Terminal, Hasher, Receiver, Round, Sender, SpawnHandle, Terminator, }; @@ -37,12 +37,12 @@ pub(crate) async fn run( }) .fuse(); - let (parents_for_creator, parents_from_terminal) = mpsc::unbounded(); + let (parents_for_creator, parents_from_dag) = mpsc::unbounded(); let creator_terminator = terminator.add_offspring_connection("creator"); let io = creation::IO { outgoing_units: outgoing_notifications.clone(), - incoming_parents: parents_from_terminal, + incoming_parents: parents_from_dag, }; let creator_handle = spawn_handle .spawn_essential( @@ -58,37 +58,25 @@ pub(crate) async fn run( pending().await }; - let mut terminal = Terminal::new(index, incoming_notifications, outgoing_notifications); + let reconstruction = ReconstructionService::new( + incoming_notifications, + outgoing_notifications, + parents_for_creator, + electors_tx, + ); - // send a new parent candidate to the creator - let mut parents_for_creator = Some(parents_for_creator); - terminal.register_post_insert_hook(Box::new(move |u| { - if let Some(parents_for_creator_tx) = &parents_for_creator { - if parents_for_creator_tx.unbounded_send(u.into()).is_err() { - warn!(target: "AlephBFT", "Channel to creator was closed."); - parents_for_creator = None; - } - } - })); - // try to extend the partial order after adding a unit to the dag - terminal.register_post_insert_hook(Box::new(move |u| { - electors_tx - .unbounded_send(u.into()) - .expect("Channel to extender should be open.") - })); - - let terminal_terminator = terminator.add_offspring_connection("terminal"); - let mut terminal_handle = spawn_handle - .spawn_essential("consensus/terminal", async move { - terminal.run(terminal_terminator).await + let reconstruction_terminator = terminator.add_offspring_connection("reconstruction"); + let mut reconstruction_handle = spawn_handle + .spawn_essential("consensus/reconstruction", async move { + reconstruction.run(reconstruction_terminator).await }) .fuse(); debug!(target: "AlephBFT", "{:?} All services started.", index); futures::select! { _ = terminator.get_exit().fuse() => {}, - _ = terminal_handle => { - debug!(target: "AlephBFT-consensus", "{:?} terminal task terminated early.", index); + _ = reconstruction_handle => { + debug!(target: "AlephBFT-consensus", "{:?} reconstruction task terminated early.", index); }, _ = creator_panic_handle.fuse() => { error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index); @@ -102,7 +90,13 @@ pub(crate) async fn run( // we stop no matter if received Ok or Err terminator.terminate_sync().await; - handle_task_termination(terminal_handle, "AlephBFT-consensus", "Terminal", index).await; + handle_task_termination( + reconstruction_handle, + "AlephBFT-consensus", + "Reconstruction", + index, + ) + .await; handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await; handle_task_termination(extender_handle, "AlephBFT-consensus", "Extender", index).await; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f61fa902..704c9e98 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -9,8 +9,8 @@ mod creation; mod extension; mod member; mod network; +mod reconstruction; mod runway; -mod terminal; mod terminator; mod units; diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/reconstruction/dag.rs new file mode 100644 index 00000000..f96f03b3 --- /dev/null +++ b/consensus/src/reconstruction/dag.rs @@ -0,0 +1,229 @@ +use crate::{reconstruction::ReconstructedUnit, Hasher}; +use std::collections::{HashMap, HashSet}; + +struct OrphanedUnit { + unit: ReconstructedUnit, + missing_parents: HashSet, +} + +impl OrphanedUnit { + /// If there are no missing parents then returns just the internal unit. + pub fn new( + unit: ReconstructedUnit, + missing_parents: HashSet, + ) -> Result> { + match missing_parents.is_empty() { + true => Err(unit), + false => Ok(OrphanedUnit { + unit, + missing_parents, + }), + } + } + + /// If this was the last missing parent return the reconstructed unit. + pub fn resolve_parent(self, parent: H::Hash) -> Result, Self> { + let OrphanedUnit { + unit, + mut missing_parents, + } = self; + missing_parents.remove(&parent); + match missing_parents.is_empty() { + true => Ok(unit), + false => Err(OrphanedUnit { + unit, + missing_parents, + }), + } + } + + /// The hash of the unit. + pub fn hash(&self) -> H::Hash { + self.unit.hash() + } + + /// The set of still missing parents. + pub fn missing_parents(&self) -> &HashSet { + &self.missing_parents + } +} + +/// A structure ensuring that units added to it are output in an order +/// in agreement with the DAG order. +/// TODO: This should likely be the final destination of units going through a pipeline. +/// This requires quite a bit more of a refactor. +pub struct Dag { + orphaned_units: HashMap>, + waiting_for: HashMap>, + dag_units: HashMap>, +} + +impl Dag { + /// Create a new empty DAG. + pub fn new() -> Self { + Dag { + orphaned_units: HashMap::new(), + waiting_for: HashMap::new(), + dag_units: HashMap::new(), + } + } + + fn move_to_dag(&mut self, unit: ReconstructedUnit) -> Vec> { + let unit_hash = unit.hash(); + self.dag_units.insert(unit_hash, unit.clone()); + // Start with this unit, so that the units are in order at the end. + let mut result = vec![unit]; + for child in self.waiting_for.remove(&unit_hash).iter().flatten() { + match self + .orphaned_units + .remove(child) + .expect("we were waiting for parents") + .resolve_parent(unit_hash) + { + Ok(unit) => result.append(&mut self.move_to_dag(unit)), + Err(orphan) => { + self.orphaned_units.insert(*child, orphan); + } + } + } + result + } + + /// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag, + /// in an order agreeing with the Dag structure. + pub fn add_unit(&mut self, unit: ReconstructedUnit) -> Vec> { + if self.dag_units.contains_key(&unit.hash()) { + // Deduplicate. + return Vec::new(); + } + let missing_parents = unit + .parents() + .values() + .filter(|parent| !self.dag_units.contains_key(parent)) + .cloned() + .collect(); + match OrphanedUnit::new(unit, missing_parents) { + Ok(orphan) => { + let unit_hash = orphan.hash(); + for parent in orphan.missing_parents() { + self.waiting_for.entry(*parent).or_default().push(unit_hash); + } + self.orphaned_units.insert(unit_hash, orphan); + Vec::new() + } + Err(unit) => self.move_to_dag(unit), + } + } +} + +#[cfg(test)] +mod test { + use crate::{ + reconstruction::{dag::Dag, ReconstructedUnit}, + units::tests::{random_full_parent_units_up_to, TestFullUnit}, + Hasher, NodeCount, NodeIndex, NodeMap, + }; + use aleph_bft_mock::Hasher64; + use std::collections::HashSet; + + fn full_parents_to_map( + parents: Vec<::Hash>, + ) -> NodeMap<::Hash> { + let mut result = NodeMap::with_size(NodeCount(parents.len())); + for (id, parent) in parents.into_iter().enumerate() { + result.insert(NodeIndex(id), parent); + } + result + } + + // silly clippy, the map below doesn't work with &[..] + #[allow(clippy::ptr_arg)] + fn unit_hashes(units: &Vec) -> Vec<::Hash> { + units.iter().map(|unit| unit.hash()).collect() + } + + fn reconstructed(dag: Vec>) -> Vec>> { + let hashes: Vec<_> = dag.iter().map(unit_hashes).collect(); + let initial_units: Vec<_> = dag + .get(0) + .expect("only called on nonempty dags") + .iter() + .map(|unit| ReconstructedUnit::initial(unit.unit())) + .collect(); + let mut result = vec![initial_units]; + for (units, parents) in dag.iter().skip(1).zip(hashes) { + let parents = full_parents_to_map(parents); + let reconstructed = units + .iter() + .map(|unit| { + ReconstructedUnit::with_parents(unit.unit(), parents.clone()) + .expect("parents are correct") + }) + .collect(); + result.push(reconstructed); + } + result + } + + #[test] + fn reconstructs_initial_units() { + let mut dag = Dag::new(); + for unit in reconstructed(random_full_parent_units_up_to(0, NodeCount(4))) + .pop() + .expect("we have initial units") + { + let reconstructed = dag.add_unit(unit.clone()); + assert_eq!(reconstructed, vec![unit]); + } + } + + #[test] + fn reconstructs_units_in_order() { + let mut dag = Dag::new(); + for units in reconstructed(random_full_parent_units_up_to(7, NodeCount(4))) { + for unit in units { + let reconstructed = dag.add_unit(unit.clone()); + assert_eq!(reconstructed, vec![unit]); + } + } + } + + #[test] + fn reconstructs_units_in_reverse_order() { + let full_unit_dag = random_full_parent_units_up_to(7, NodeCount(4)); + let mut hash_batches: Vec<_> = full_unit_dag + .iter() + .map(unit_hashes) + .map(HashSet::from_iter) + .collect(); + hash_batches.reverse(); + let mut unit_dag = reconstructed(full_unit_dag); + unit_dag.reverse(); + let mut initial_units = unit_dag.pop().expect("initial units are there"); + let mut dag = Dag::new(); + for units in unit_dag { + for unit in units { + let reconstructed = dag.add_unit(unit.clone()); + assert!(reconstructed.is_empty()); + } + } + let last_initial_unit = initial_units.pop().expect("there is an initial unit"); + for unit in initial_units { + let reconstructed = dag.add_unit(unit.clone()); + let mut current_round_hashes: HashSet<_> = hash_batches.pop().expect("we are not done"); + assert!(current_round_hashes.remove(&unit.hash())); + if !current_round_hashes.is_empty() { + hash_batches.push(current_round_hashes); + } + assert_eq!(reconstructed, vec![unit]); + } + for unit in dag.add_unit(last_initial_unit) { + let mut current_round_hashes = hash_batches.pop().expect("we are not done"); + assert!(current_round_hashes.remove(&unit.hash())); + if !current_round_hashes.is_empty() { + hash_batches.push(current_round_hashes); + } + } + assert!(hash_batches.is_empty()); + } +} diff --git a/consensus/src/reconstruction/mod.rs b/consensus/src/reconstruction/mod.rs new file mode 100644 index 00000000..74b13d29 --- /dev/null +++ b/consensus/src/reconstruction/mod.rs @@ -0,0 +1,211 @@ +use crate::{ + extension::ExtenderUnit, + runway::{NotificationIn, NotificationOut}, + units::{ControlHash, Unit}, + Hasher, NodeMap, Receiver, Sender, Terminator, +}; +use futures::{FutureExt, StreamExt}; +use log::{debug, warn}; + +mod dag; +mod parents; + +use dag::Dag; +use parents::{Reconstruction, ReconstructionResult, Request}; + +const LOG_TARGET: &str = "AlephBFT-reconstruction"; + +/// A unit with its parents represented explicitly. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ReconstructedUnit { + unit: Unit, + parents: NodeMap, +} + +impl ReconstructedUnit { + /// Returns a reconstructed unit if the parents agree with the hash, errors out otherwise. + pub fn with_parents(unit: Unit, parents: NodeMap) -> Result> { + match unit.control_hash().combined_hash == ControlHash::::combine_hashes(&parents) { + true => Ok(ReconstructedUnit { unit, parents }), + false => Err(unit), + } + } + + /// Reconstructs empty parents for a round 0 unit. + /// Assumes obviously incorrect units with wrong control hashes have been rejected earlier. + /// Will panic if called for any other kind of unit. + pub fn initial(unit: Unit) -> Self { + let n_members = unit.control_hash().n_members(); + assert!(unit.round() == 0, "Only the zeroth unit can be initial."); + ReconstructedUnit { + unit, + parents: NodeMap::with_size(n_members), + } + } + + /// The reconstructed parents, guaranteed to be correct. + pub fn parents(&self) -> &NodeMap { + &self.parents + } + + /// The hash of the unit. + pub fn hash(&self) -> H::Hash { + self.unit.hash() + } + + fn unit(&self) -> Unit { + self.unit.clone() + } + + fn extender_unit(&self) -> ExtenderUnit { + ExtenderUnit::new( + self.unit.creator(), + self.unit.round(), + self.hash(), + self.parents.clone(), + ) + } +} + +/// The service responsible for reconstructing the structure of the Dag. +/// Receives units containing control hashes and eventually outputs versions +/// with explicit parents in an order conforming to the Dag order. +pub struct Service { + reconstruction: Reconstruction, + dag: Dag, + notifications_from_runway: Receiver>, + notifications_for_runway: Sender>, + units_for_creator: Sender>, + units_for_extender: Sender>, +} + +enum Output { + Unit(ReconstructedUnit), + Request(Request), +} + +impl Service { + /// Create a new reconstruction service with the provided IO channels. + pub fn new( + notifications_from_runway: Receiver>, + notifications_for_runway: Sender>, + units_for_creator: Sender>, + units_for_extender: Sender>, + ) -> Self { + let reconstruction = Reconstruction::new(); + let dag = Dag::new(); + Service { + reconstruction, + dag, + notifications_from_runway, + notifications_for_runway, + units_for_creator, + units_for_extender, + } + } + + fn handle_reconstruction_result( + &mut self, + reconstruction_result: ReconstructionResult, + ) -> Vec> { + use Output::*; + let mut result = Vec::new(); + let (units, requests) = reconstruction_result.into(); + result.append(&mut requests.into_iter().map(Request).collect()); + for unit in units { + result.append(&mut self.dag.add_unit(unit).into_iter().map(Unit).collect()); + } + result + } + + fn handle_notification(&mut self, notification: NotificationIn) -> Vec> { + use NotificationIn::*; + let mut result = Vec::new(); + match notification { + NewUnits(units) => { + for unit in units { + let reconstruction_result = self.reconstruction.add_unit(unit); + result.append(&mut self.handle_reconstruction_result(reconstruction_result)); + } + } + UnitParents(unit, parents) => { + let reconstruction_result = self.reconstruction.add_parents(unit, parents); + result.append(&mut self.handle_reconstruction_result(reconstruction_result)) + } + } + result + } + + fn handle_output(&mut self, output: Output) -> bool { + use Output::*; + match output { + Request(request) => { + if self + .notifications_for_runway + .unbounded_send(request.into()) + .is_err() + { + warn!(target: LOG_TARGET, "Notification channel should be open."); + return false; + } + } + Unit(unit) => { + // TODO add other destinations + if self.units_for_creator.unbounded_send(unit.unit()).is_err() { + warn!(target: LOG_TARGET, "Creator channel should be open."); + return false; + } + if self + .units_for_extender + .unbounded_send(unit.extender_unit()) + .is_err() + { + warn!(target: LOG_TARGET, "Extender channel should be open."); + return false; + } + if self + .notifications_for_runway + .unbounded_send(NotificationOut::AddedToDag( + unit.hash(), + unit.parents().clone().into_values().collect(), + )) + .is_err() + { + warn!(target: LOG_TARGET, "Notification channel should be open."); + return false; + } + } + } + true + } + + /// Run the reconstruction service until terminated. + pub async fn run(mut self, mut terminator: Terminator) { + let mut exiting = false; + loop { + futures::select! { + n = self.notifications_from_runway.next() => match n { + Some(notification) => for output in self.handle_notification(notification) { + if !self.handle_output(output) { + exiting = true; + break; + } + }, + None => { + warn!(target: LOG_TARGET, "Notifications for reconstruction unexpectedly ended."); + exiting = true; + } + }, + _ = terminator.get_exit().fuse() => { + debug!(target: LOG_TARGET, "Received exit signal."); + exiting = true; + } + } + if exiting { + debug!(target: LOG_TARGET, "Reconstruction decided to exit."); + terminator.terminate_sync().await; + break; + } + } + } +} diff --git a/consensus/src/reconstruction/parents.rs b/consensus/src/reconstruction/parents.rs new file mode 100644 index 00000000..67f51d3f --- /dev/null +++ b/consensus/src/reconstruction/parents.rs @@ -0,0 +1,467 @@ +use crate::{ + reconstruction::ReconstructedUnit, + runway::NotificationOut, + units::{ControlHash, Unit, UnitCoord}, + Hasher, NodeIndex, NodeMap, +}; +use std::collections::{hash_map::Entry, HashMap}; + +/// A unit in the process of reconstructing its parents. +#[derive(Debug, PartialEq, Eq, Clone)] +enum ReconstructingUnit { + /// We are trying to optimistically reconstruct the unit from potential parents we get. + Reconstructing(Unit, NodeMap), + /// We are waiting for receiving an explicit list of unit parents. + WaitingForParents(Unit), +} + +enum SingleParentReconstructionResult { + Reconstructed(ReconstructedUnit), + InProgress(ReconstructingUnit), + RequestParents(ReconstructingUnit), +} + +impl ReconstructingUnit { + /// Produces a new reconstructing unit and a list of coordinates of parents we need for the reconstruction. Will panic if called for units of round 0. + fn new(unit: Unit) -> (Self, Vec) { + let n_members = unit.control_hash().n_members(); + let round = unit.round(); + assert!( + round != 0, + "We should never try to reconstruct parents of a unit of round 0." + ); + let coords = unit + .control_hash() + .parents() + .map(|parent_id| UnitCoord::new(round - 1, parent_id)) + .collect(); + ( + ReconstructingUnit::Reconstructing(unit, NodeMap::with_size(n_members)), + coords, + ) + } + + fn reconstruct_parent( + self, + parent_id: NodeIndex, + parent_hash: H::Hash, + ) -> SingleParentReconstructionResult { + use ReconstructingUnit::*; + use SingleParentReconstructionResult::*; + match self { + Reconstructing(unit, mut parents) => { + parents.insert(parent_id, parent_hash); + match parents.item_count() == unit.control_hash().parents().count() { + // We have enought parents, just need to check the control hash matches. + true => match ReconstructedUnit::with_parents(unit, parents) { + Ok(unit) => Reconstructed(unit), + // If the control hash doesn't match we want to get an explicit list of parents. + Err(unit) => RequestParents(WaitingForParents(unit)), + }, + false => InProgress(Reconstructing(unit, parents)), + } + } + // If we are already waiting for explicit parents, ignore any resolved ones; this shouldn't really happen. + WaitingForParents(unit) => InProgress(WaitingForParents(unit)), + } + } + + fn control_hash(&self) -> &ControlHash { + use ReconstructingUnit::*; + match self { + Reconstructing(unit, _) | WaitingForParents(unit) => unit.control_hash(), + } + } + + fn as_unit(&self) -> &Unit { + use ReconstructingUnit::*; + match self { + Reconstructing(unit, _) | WaitingForParents(unit) => unit, + } + } + + fn into_unit(self) -> Unit { + use ReconstructingUnit::*; + match self { + Reconstructing(unit, _) | WaitingForParents(unit) => unit, + } + } + + fn with_parents(self, parent_hashes: Vec) -> Result, Self> { + let control_hash = self.control_hash(); + let mut parents = NodeMap::with_size(control_hash.n_members()); + for (parent_id, parent_hash) in control_hash.parents().zip(parent_hashes.into_iter()) { + parents.insert(parent_id, parent_hash); + } + ReconstructedUnit::with_parents(self.clone().into_unit(), parents).map_err(|_| self) + } +} + +/// What we need to request to reconstruct units. +#[derive(Debug, PartialEq, Eq)] +pub enum Request { + /// We need a unit at this coordinate. + Coord(UnitCoord), + /// We need the explicit list of parents for the unit identified by the hash. + /// This should only happen in the presence of forks, when optimistic reconstruction failed. + ParentsOf(H::Hash), +} + +impl From> for NotificationOut { + fn from(request: Request) -> Self { + use NotificationOut::*; + use Request::*; + match request { + // This is a tad weird, but should get better after the runway refactor. + Coord(coord) => MissingUnits(vec![coord]), + ParentsOf(unit) => WrongControlHash(unit), + } + } +} + +/// The result of a reconstruction attempt. Might contain multiple reconstructed units, +/// as well as requests for some data that is needed for further reconstruction. +#[derive(Debug, PartialEq, Eq)] +pub struct ReconstructionResult { + reconstructed_units: Vec>, + requests: Vec>, +} + +impl ReconstructionResult { + fn new() -> Self { + ReconstructionResult { + reconstructed_units: Vec::new(), + requests: Vec::new(), + } + } + + fn reconstructed(unit: ReconstructedUnit) -> Self { + ReconstructionResult { + reconstructed_units: vec![unit], + requests: Vec::new(), + } + } + + fn request(request: Request) -> Self { + ReconstructionResult { + reconstructed_units: Vec::new(), + requests: vec![request], + } + } + + fn add_unit(&mut self, unit: ReconstructedUnit) { + self.reconstructed_units.push(unit); + } + + fn add_request(&mut self, request: Request) { + self.requests.push(request); + } + + fn accumulate(&mut self, other: ReconstructionResult) { + let ReconstructionResult { + mut reconstructed_units, + mut requests, + } = other; + self.reconstructed_units.append(&mut reconstructed_units); + self.requests.append(&mut requests); + } +} + +impl From> for (Vec>, Vec>) { + fn from(result: ReconstructionResult) -> Self { + let ReconstructionResult { + reconstructed_units, + requests, + } = result; + (reconstructed_units, requests) + } +} + +/// Receives units with control hashes and reconstructs their parents. +pub struct Reconstruction { + reconstructing_units: HashMap>, + units_by_coord: HashMap, + waiting_for_coord: HashMap>, +} + +impl Reconstruction { + /// A new parent reconstruction widget. + pub fn new() -> Self { + Reconstruction { + reconstructing_units: HashMap::new(), + units_by_coord: HashMap::new(), + waiting_for_coord: HashMap::new(), + } + } + + fn reconstruct_parent( + &mut self, + child_hash: H::Hash, + parent_id: NodeIndex, + parent_hash: H::Hash, + ) -> ReconstructionResult { + use SingleParentReconstructionResult::*; + match self.reconstructing_units.remove(&child_hash) { + Some(child) => match child.reconstruct_parent(parent_id, parent_hash) { + Reconstructed(unit) => ReconstructionResult::reconstructed(unit), + InProgress(unit) => { + self.reconstructing_units.insert(child_hash, unit); + ReconstructionResult::new() + } + RequestParents(unit) => { + let hash = unit.as_unit().hash(); + self.reconstructing_units.insert(child_hash, unit); + ReconstructionResult::request(Request::ParentsOf(hash)) + } + }, + // We might have reconstructed the unit through explicit parents if someone sent them to us for no reason, + // in which case we don't have it any more. + None => ReconstructionResult::new(), + } + } + + /// Add a unit and start reconstructing its parents. + pub fn add_unit(&mut self, unit: Unit) -> ReconstructionResult { + let mut result = ReconstructionResult::new(); + let unit_hash = unit.hash(); + if self.reconstructing_units.contains_key(&unit_hash) { + // We already received this unit once, no need to do anything. + return result; + } + let unit_coord = UnitCoord::new(unit.round(), unit.creator()); + // We place the unit in the coord map only if this is the first variant ever received. + // This is not crucial for correctness, but helps in clarity. + if let Entry::Vacant(entry) = self.units_by_coord.entry(unit_coord) { + entry.insert(unit_hash); + } + + if let Some(children) = self.waiting_for_coord.remove(&unit_coord) { + // We reconstruct the parent for each unit that waits for this coord. + for child_hash in children { + result.accumulate(self.reconstruct_parent( + child_hash, + unit_coord.creator(), + unit_hash, + )); + } + } + match unit_coord.round() { + 0 => { + let unit = ReconstructedUnit::initial(unit); + result.add_unit(unit); + } + _ => { + let (unit, parent_coords) = ReconstructingUnit::new(unit); + self.reconstructing_units.insert(unit_hash, unit); + for parent_coord in parent_coords { + match self.units_by_coord.get(&parent_coord) { + Some(parent_hash) => result.accumulate(self.reconstruct_parent( + unit_hash, + parent_coord.creator(), + *parent_hash, + )), + None => { + self.waiting_for_coord + .entry(parent_coord) + .or_default() + .push(unit_hash); + result.add_request(Request::Coord(parent_coord)); + } + } + } + } + } + result + } + + /// Add an explicit list of a units' parents, perhaps reconstructing it. + pub fn add_parents( + &mut self, + unit_hash: H::Hash, + parents: Vec, + ) -> ReconstructionResult { + // If we don't have the unit, just ignore this response. + match self.reconstructing_units.remove(&unit_hash) { + Some(unit) => match unit.with_parents(parents) { + Ok(unit) => ReconstructionResult::reconstructed(unit), + Err(unit) => { + self.reconstructing_units.insert(unit_hash, unit); + ReconstructionResult::new() + } + }, + None => ReconstructionResult::new(), + } + } +} + +#[cfg(test)] +mod test { + use crate::{ + reconstruction::{ + parents::{Reconstruction, Request}, + ReconstructedUnit, + }, + units::{ + tests::{random_full_parent_units_up_to, TestFullUnit}, + UnitCoord, + }, + NodeCount, NodeIndex, + }; + + #[test] + fn reconstructs_initial_units() { + let mut reconstruction = Reconstruction::new(); + for unit in &random_full_parent_units_up_to(0, NodeCount(4))[0] { + let unit = unit.unit(); + let (mut reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), 1); + let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit)); + assert_eq!(reconstructed_unit.parents().item_count(), 0); + } + } + + #[test] + fn reconstructs_units_coming_in_order() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(7, NodeCount(4)); + for units in &dag { + for unit in units { + let unit = unit.unit(); + let round = unit.round(); + let (mut reconstructed_units, requests) = + reconstruction.add_unit(unit.clone()).into(); + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), 1); + let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + match round { + 0 => { + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit)); + assert_eq!(reconstructed_unit.parents().item_count(), 0); + } + round => { + assert_eq!(reconstructed_unit.parents().item_count(), 4); + let parents: &Vec = dag + .get((round - 1) as usize) + .expect("the parents are there"); + for (parent, reconstructed_parent) in + parents.iter().zip(reconstructed_unit.parents().values()) + { + assert_eq!(&parent.hash(), reconstructed_parent); + } + } + } + } + } + } + + #[test] + fn requests_all_parents() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(1, NodeCount(4)); + let unit = dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit") + .unit(); + let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + assert!(reconstructed_units.is_empty()); + assert_eq!(requests.len(), 4); + } + + #[test] + fn requests_single_parent() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(1, NodeCount(4)); + for unit in dag.get(0).expect("just created").iter().skip(1) { + let unit = unit.unit(); + reconstruction.add_unit(unit.clone()); + } + let unit = dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit") + .unit(); + let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + assert!(reconstructed_units.is_empty()); + assert_eq!(requests.len(), 1); + assert_eq!( + requests.last().expect("just checked"), + &Request::Coord(UnitCoord::new(0, NodeIndex(0))) + ); + } + + #[test] + fn reconstructs_units_coming_in_reverse_order() { + let mut reconstruction = Reconstruction::new(); + let mut dag = random_full_parent_units_up_to(7, NodeCount(4)); + dag.reverse(); + for unit in dag.get(0).expect("we have the top units") { + let unit = unit.unit(); + let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + assert!(reconstructed_units.is_empty()); + assert_eq!(requests.len(), 4); + } + let mut total_reconstructed = 0; + for mut units in dag.into_iter().skip(1) { + let last_unit = units.pop().expect("we have the unit"); + for unit in units { + let unit = unit.unit(); + let (reconstructed_units, _) = reconstruction.add_unit(unit.clone()).into(); + total_reconstructed += reconstructed_units.len(); + } + let unit = last_unit.unit(); + let (reconstructed_units, _) = reconstruction.add_unit(unit.clone()).into(); + total_reconstructed += reconstructed_units.len(); + assert!(reconstructed_units.len() >= 4); + } + assert_eq!(total_reconstructed, 4 * 8); + } + + #[test] + fn handles_bad_hash() { + let mut reconstruction = Reconstruction::new(); + let dag = random_full_parent_units_up_to(0, NodeCount(4)); + for unit in dag.get(0).expect("just created") { + let unit = unit.unit(); + reconstruction.add_unit(unit.clone()); + } + let other_dag = random_full_parent_units_up_to(1, NodeCount(4)); + let unit = other_dag + .get(1) + .expect("just created") + .last() + .expect("we have a unit") + .unit(); + let unit_hash = unit.hash(); + let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); + assert!(reconstructed_units.is_empty()); + assert_eq!(requests.len(), 1); + assert_eq!( + requests.last().expect("just checked"), + &Request::ParentsOf(unit_hash), + ); + let parent_hashes: Vec<_> = other_dag + .get(0) + .expect("other dag has initial units") + .iter() + .map(|unit| unit.hash()) + .collect(); + let (mut reconstructed_units, requests) = reconstruction + .add_parents(unit_hash, parent_hashes.clone()) + .into(); + assert!(requests.is_empty()); + assert_eq!(reconstructed_units.len(), 1); + let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); + assert_eq!(reconstructed_unit.parents().item_count(), 4); + for (parent, reconstructed_parent) in parent_hashes + .iter() + .zip(reconstructed_unit.parents().values()) + { + assert_eq!(parent, reconstructed_parent); + } + } +} diff --git a/consensus/src/terminal.rs b/consensus/src/terminal.rs deleted file mode 100644 index 212f1962..00000000 --- a/consensus/src/terminal.rs +++ /dev/null @@ -1,397 +0,0 @@ -use futures::{FutureExt, StreamExt}; -use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, - fmt::{Debug, Formatter}, -}; - -use crate::{ - extension::ExtenderUnit, - runway::{NotificationIn, NotificationOut}, - units::{ControlHash, Unit, UnitCoord}, - Hasher, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Terminator, -}; -use codec::{Decode, Encode}; -use log::{debug, trace, warn}; - -/// An enum describing the status of a Unit in the Terminal pipeline. -#[derive(Clone, Eq, PartialEq, Hash, Debug, Decode, Encode)] -pub enum UnitStatus { - ReconstructingParents, - WrongControlHash, - WaitingParentsInDag, - InDag, -} - -/// A Unit struct used in the Terminal. It stores a copy of a unit and apart from that some -/// information on its status, i.e., already reconstructed parents etc. -#[derive(Clone, Eq, PartialEq)] -pub struct TerminalUnit { - unit: Unit, - // This represents the knowledge of what we think the parents of the unit are. It is initialized as all None - // and hashes of parents are being added at the time we receive a given coord. Once the parents map is complete - // we test whether the hash of parents agains the control_hash in the unit. If the check fails, we request hashes - // of parents of this unit via a NotificationOut. - parents: NodeMap, - n_miss_par_decoded: NodeCount, - n_miss_par_dag: NodeCount, - status: UnitStatus, -} - -impl Debug for TerminalUnit { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TerminalUnit") - .field("unit", &self.unit) - .field("parent count", &self.parents.size()) - .field("miss par decoded count", &self.n_miss_par_decoded) - .field("miss par dag count", &self.n_miss_par_dag) - .field("status", &self.status) - .finish() - } -} - -impl From> for ExtenderUnit { - fn from(u: TerminalUnit) -> ExtenderUnit { - ExtenderUnit::new(u.unit.creator(), u.unit.round(), u.unit.hash(), u.parents) - } -} - -impl From> for Unit { - fn from(u: TerminalUnit) -> Unit { - u.unit - } -} - -impl TerminalUnit { - // creates a unit from a Control Hash Unit, that has no parents reconstructed yet - pub(crate) fn blank_from_unit(unit: &Unit) -> Self { - let n_members = unit.control_hash().n_members(); - let n_parents = unit.control_hash().n_parents(); - TerminalUnit { - unit: unit.clone(), - parents: NodeMap::with_size(n_members), - n_miss_par_decoded: n_parents, - n_miss_par_dag: n_parents, - status: UnitStatus::ReconstructingParents, - } - } - - pub(crate) fn verify_control_hash(&self) -> bool { - // this will be called only after all parents have been reconstructed - - self.unit.control_hash().combined_hash == ControlHash::::combine_hashes(&self.parents) - } -} - -pub enum TerminalEvent { - ParentsReconstructed(H::Hash), - ParentsInDag(H::Hash), -} - -type SyncClosure = Box Y + Sync + Send + 'static>; - -/// A process whose goal is to receive new units and place them in our local Dag. -/// Importantly, our local Dag is a set of units that are *guaranteed* to be sooner or later -/// received by all honest nodes in the network. -/// The Terminal receives notifications from the ntfct_rx channel endpoint and pushes requests for units -/// and other notifications to the ntfct_tx channel endpoint. -/// The path of a single unit u through the Terminal is as follows: -/// 1) It is added to the unit_store. -/// 2) For each parent coord (r, ix) if we have a unit v with such a coord in store then we "reconstruct" this parent -/// by placing Some(v_hash), where v_hash is the hash of v in parents[ix] in the TerminalUnit object for u. -/// 3) For each parent coord (r, ix) that is missing in the store, we request this unit by sending a -/// suitable NotificationOut. Subsequently we wait for these units, whenever a unit of one of the -/// missing coords is received, the parents field of the TerminalUnit object of u is updated. -/// 4) At the moment when all parents have been reconstructed (which happens right away for round-0 units -/// that have no parents) an appropriate Event is generated and we check whether the reconstructed parents -/// list after hashing agrees with the control_hash in the unit. -/// a) If yes, then the unit u gets status WaitingParentsInDag -/// b) If no, then the unit gets status WrongControlHash and a notification is sent, requesting the list of parent -/// hashes of u. At the moment when a response to this notification is received (which contains correct parent -/// hashes of u), the parents field in the TerminalUnit object of u is updated accordingly, and the unit u -/// gets status WaitingParentsInDag as in a) -/// 5) Now we wait for all the parents of unit u to be in Dag (in order to add it to Dag). Initially, right after u gets -/// gets this status, we mark all the parents that are already in the Dag and set appropriate triggers for when -/// the remaining parents are added to Dag. -/// 6) At the moment when all parents have been added to Dag, the unit itself is added to Dag. -/// -/// We also refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html -/// Section 5.3 for a discussion of this component. - -pub(crate) struct Terminal { - node_id: NodeIndex, - // A channel for receiving notifications (units mainly) - ntfct_rx: Receiver>, - // A channel to push outgoing notifications - ntfct_tx: Sender>, - // A Queue to handle events happening in the Terminal. The reason of this being a queue is because - // some events trigger other events and because of the Dag structure, these should be handled - // in a FIFO order (as in BFS) and not recursively (as in DFS). - event_queue: VecDeque>, - post_insert: Vec, ()>>, - // Here we store all the units -- the ones in Dag and the ones "hanging". - unit_store: HashMap>, - - // In this Map, for each pair (r, pid) we store the first unit made by pid at round r that we ever received. - // In case of forks, we still store only the first one -- others are ignored (but stored in store under their hash). - unit_by_coord: HashMap<(Round, NodeIndex), H::Hash>, - // This stores, for a pair (r, pid) the list of all units (by hash) that await a unit made by pid at - // round r, as their parent. Once such a unit arrives, we notify all these children. - children_coord: HashMap<(Round, NodeIndex), Vec>, - // The same as above, but this time we await for a unit (with a particular hash) to be added to the Dag. - // Once this happens, we notify all the children. - children_hash: HashMap>, - exiting: bool, -} - -impl Terminal { - pub(crate) fn new( - node_id: NodeIndex, - ntfct_rx: Receiver>, - ntfct_tx: Sender>, - ) -> Self { - Terminal { - node_id, - ntfct_rx, - ntfct_tx, - event_queue: VecDeque::new(), - post_insert: Vec::new(), - unit_store: HashMap::new(), - unit_by_coord: HashMap::new(), - children_coord: HashMap::new(), - children_hash: HashMap::new(), - exiting: false, - } - } - - // Reconstruct the parent of a unit u (given by hash u_hash) at position pid as p (given by hash p_hash) - fn reconstruct_parent(&mut self, u_hash: &H::Hash, pid: NodeIndex, p_hash: &H::Hash) { - let u = self.unit_store.get_mut(u_hash).unwrap(); - // the above unwraps must succeed, should probably add some debug messages here... - - u.parents.insert(pid, *p_hash); - u.n_miss_par_decoded -= NodeCount(1); - if u.n_miss_par_decoded == NodeCount(0) { - self.event_queue - .push_back(TerminalEvent::ParentsReconstructed(*u_hash)); - } - } - - // update u's count (u given by hash u_hash) of parents present in Dag and trigger the event of - // adding u itself to the Dag - fn new_parent_in_dag(&mut self, u_hash: &H::Hash) { - let u = self.unit_store.get_mut(u_hash).unwrap(); - u.n_miss_par_dag -= NodeCount(1); - if u.n_miss_par_dag == NodeCount(0) { - self.event_queue - .push_back(TerminalEvent::ParentsInDag(*u_hash)); - } - } - - // Adds the unit u (u given by hash u_hash) to the list of units waiting for the coord (round, pid) to be - // added to store. - fn add_coord_trigger(&mut self, round: Round, pid: NodeIndex, u_hash: H::Hash) { - let coord = (round, pid); - if !self.children_coord.contains_key(&coord) { - self.children_coord.insert((round, pid), Vec::new()); - } - let wait_list = self.children_coord.get_mut(&coord).unwrap(); - wait_list.push(u_hash); - } - - // Adds the unit u (u given by hash u_hash) to the list of units waiting for the unit p (p_hash) to be - // added to the Dag. - fn add_hash_trigger(&mut self, p_hash: &H::Hash, u_hash: &H::Hash) { - if !self.children_hash.contains_key(p_hash) { - self.children_hash.insert(*p_hash, Vec::new()); - } - let wait_list = self.children_hash.get_mut(p_hash).unwrap(); - wait_list.push(*u_hash); - } - - fn update_on_store_add(&mut self, u: Unit) { - let u_hash = u.hash(); - let (u_round, pid) = (u.round(), u.creator()); - // We place the unit in the coord map only if this is the first variant ever received. - // This is not crucial for correctness, but helps in clarity. - if let Entry::Vacant(entry) = self.unit_by_coord.entry((u_round, pid)) { - entry.insert(u_hash); - } else { - debug!(target: "AlephBFT-terminal", "Received a fork at round {:?} creator {:?}", u_round, pid); - } - - if let Some(children) = self.children_coord.remove(&(u_round, pid)) { - // We reconstruct the parent for each unit that waits for this coord. - for v_hash in children { - self.reconstruct_parent(&v_hash, pid, &u_hash); - } - } - if u_round == 0 { - self.event_queue - .push_back(TerminalEvent::ParentsReconstructed(u_hash)); - } else { - let mut coords_to_request = Vec::new(); - for i in u.control_hash().parents() { - let coord = (u_round - 1, i); - let maybe_hash = self.unit_by_coord.get(&coord).cloned(); - match maybe_hash { - Some(v_hash) => self.reconstruct_parent(&u_hash, i, &v_hash), - None => { - self.add_coord_trigger(u_round - 1, i, u_hash); - let coord = UnitCoord::new(u.round() - 1, i); - coords_to_request.push(coord); - } - } - } - if !coords_to_request.is_empty() { - trace!(target: "AlephBFT-terminal", "{:?} Missing coords {:?}", self.node_id, coords_to_request); - self.send_notification(NotificationOut::MissingUnits(coords_to_request)); - } - } - } - - fn update_on_dag_add(&mut self, u_hash: &H::Hash) { - let u = self - .unit_store - .get(u_hash) - .expect("Unit to be added to dag must be in store") - .clone(); - self.post_insert.iter_mut().for_each(|f| f(u.clone())); - if let Some(children) = self.children_hash.remove(u_hash) { - for v_hash in children { - self.new_parent_in_dag(&v_hash); - } - } - let mut parent_hashes = Vec::new(); - for p_hash in u.parents.into_values() { - parent_hashes.push(p_hash); - } - - self.send_notification(NotificationOut::AddedToDag(*u_hash, parent_hashes)); - } - - // We set the correct parent hashes for unit u. - fn update_on_wrong_hash_response(&mut self, u_hash: H::Hash, p_hashes: Vec) { - let u = self - .unit_store - .get_mut(&u_hash) - .expect("unit with wrong control hash must be in store"); - if u.status != UnitStatus::WrongControlHash { - trace!(target: "AlephBFT-terminal", "{:?} Received parents response without it being expected for {:?}. Ignoring.", self.node_id, u_hash); - return; - } - for (counter, i) in u.unit.control_hash().parents().enumerate() { - u.parents.insert(i, p_hashes[counter]); - } - trace!(target: "AlephBFT-terminal", "{:?} Updating parent hashes for wrong control hash unit {:?}", self.node_id, u_hash); - u.n_miss_par_decoded = NodeCount(0); - self.inspect_parents_in_dag(&u_hash); - } - - fn add_to_store(&mut self, u: Unit) { - trace!(target: "AlephBFT-terminal", "{:?} Adding to store {:?} round {:?} index {:?}", self.node_id, u.hash(), u.round(), u.creator()); - if let Entry::Vacant(entry) = self.unit_store.entry(u.hash()) { - entry.insert(TerminalUnit::::blank_from_unit(&u)); - self.update_on_store_add(u); - } - } - - fn inspect_parents_in_dag(&mut self, u_hash: &H::Hash) { - let u_parents = self.unit_store.get(u_hash).unwrap().parents.clone(); - let mut n_parents_in_dag = NodeCount(0); - for p_hash in u_parents.into_values() { - let maybe_p = self.unit_store.get(&p_hash); - // p might not be even in store because u might be a unit with wrong control hash - match maybe_p { - Some(p) if p.status == UnitStatus::InDag => { - n_parents_in_dag += NodeCount(1); - } - _ => { - self.add_hash_trigger(&p_hash, u_hash); - } - } - } - let u = self.unit_store.get_mut(u_hash).unwrap(); - u.n_miss_par_dag -= n_parents_in_dag; - trace!(target: "AlephBFT-terminal", "{:?} Inspecting parents for {:?}, missing {:?}", self.node_id, u_hash, u.n_miss_par_dag); - if u.n_miss_par_dag == NodeCount(0) { - self.event_queue - .push_back(TerminalEvent::ParentsInDag(*u_hash)); - } else { - u.status = UnitStatus::WaitingParentsInDag; - // when the last parent is added an appropriate TerminalEvent will be triggered - } - } - - fn on_wrong_hash_detected(&mut self, u_hash: H::Hash) { - self.send_notification(NotificationOut::WrongControlHash(u_hash)); - } - - // This drains the event queue. Note that new events might be added to the queue as the result of - // handling other events -- for instance when a unit u is waiting for its parent p, and this parent p waits - // for his parent pp. In this case adding pp to the Dag, will trigger adding p, which in turns triggers - // adding u. - fn handle_events(&mut self) { - while let Some(event) = self.event_queue.pop_front() { - match event { - TerminalEvent::ParentsReconstructed(u_hash) => { - let u = self.unit_store.get_mut(&u_hash).unwrap(); - if u.verify_control_hash() { - self.inspect_parents_in_dag(&u_hash); - } else { - u.status = UnitStatus::WrongControlHash; - warn!(target: "AlephBFT-terminal", "{:?} wrong control hash", self.node_id); - self.on_wrong_hash_detected(u_hash); - } - } - TerminalEvent::ParentsInDag(u_hash) => { - let u = self.unit_store.get_mut(&u_hash).unwrap(); - u.status = UnitStatus::InDag; - trace!(target: "AlephBFT-terminal", "{:?} Adding to Dag {:?} round {:?} index {:?}.", self.node_id, u_hash, u.unit.round(), u.unit.creator()); - self.update_on_dag_add(&u_hash); - } - } - } - } - - pub(crate) fn register_post_insert_hook(&mut self, hook: SyncClosure, ()>) { - self.post_insert.push(hook); - } - - fn send_notification(&mut self, notification: NotificationOut) { - if self.ntfct_tx.unbounded_send(notification).is_err() { - warn!(target: "AlephBFT-terminal", "{:?} Notification channel should be open", self.node_id); - self.exiting = true; - } - } - - pub(crate) async fn run(&mut self, mut terminator: Terminator) { - loop { - futures::select! { - n = self.ntfct_rx.next() => { - match n { - Some(NotificationIn::NewUnits(units)) => { - for u in units { - self.add_to_store(u); - self.handle_events(); - } - }, - Some(NotificationIn::UnitParents(u_hash, p_hashes)) => { - self.update_on_wrong_hash_response(u_hash, p_hashes); - self.handle_events(); - }, - _ => {} - } - } - _ = terminator.get_exit().fuse() => { - debug!(target: "AlephBFT-terminal", "{:?} received exit signal", self.node_id); - self.exiting = true; - } - } - if self.exiting { - debug!(target: "AlephBFT-terminal", "{:?} Terminal decided to exit.", self.node_id); - terminator.terminate_sync().await; - break; - } - } - } -} diff --git a/consensus/src/testing/dag.rs b/consensus/src/testing/dag.rs index ac84ec5a..e9ebfe8f 100644 --- a/consensus/src/testing/dag.rs +++ b/consensus/src/testing/dag.rs @@ -85,7 +85,7 @@ impl ConsensusDagFeeder { fn on_consensus_notification(&self, notification: NotificationOut) { match notification { NotificationOut::WrongControlHash(h) => { - // We need to answer these requests as otherwise terminal cannot make progress + // We need to answer these requests as otherwise reconstruction cannot make progress let parent_hashes = self.units_map.get(&h).unwrap().parent_hashes_vec(); let notification = NotificationIn::UnitParents(h, parent_hashes); self.tx_in.unbounded_send(notification).unwrap(); diff --git a/consensus/src/units/mod.rs b/consensus/src/units/mod.rs index 710815c5..087687f1 100644 --- a/consensus/src/units/mod.rs +++ b/consensus/src/units/mod.rs @@ -221,29 +221,70 @@ impl Unit { } #[cfg(test)] -mod tests { +pub mod tests { use crate::{ - units::{ControlHash, FullUnit as GenericFullUnit, PreUnit as GenericPreUnit}, - Hasher, NodeIndex, + units::{ControlHash, FullUnit, PreUnit}, + Hasher, NodeCount, NodeIndex, NodeMap, Round, }; use aleph_bft_mock::{Data, Hasher64}; use codec::{Decode, Encode}; - type PreUnit = GenericPreUnit; - type FullUnit = GenericFullUnit; + pub type TestPreUnit = PreUnit; + pub type TestFullUnit = FullUnit; + + fn random_initial_units(n_members: NodeCount) -> Vec { + n_members + .into_iterator() + .map(|node_id| { + let control_hash = ControlHash::::new(&vec![None; n_members.0].into()); + let pre_unit = TestPreUnit::new(node_id, 0, control_hash); + FullUnit::new(pre_unit, rand::random(), 43) + }) + .collect() + } + + fn random_unit_with_parents(creator: NodeIndex, parents: &Vec) -> TestFullUnit { + let representative_parent = parents.last().expect("there are parents"); + let n_members = representative_parent.as_pre_unit().n_members(); + let round = representative_parent.round() + 1; + let mut parent_map = NodeMap::with_size(n_members); + for parent in parents { + parent_map.insert(parent.creator(), parent.hash()); + } + let control_hash = ControlHash::::new(&parent_map); + let pre_unit = TestPreUnit::new(creator, round, control_hash); + TestFullUnit::new(pre_unit, rand::random(), 43) + } + + pub fn random_full_parent_units_up_to( + round: Round, + n_members: NodeCount, + ) -> Vec> { + let mut result = vec![random_initial_units(n_members)]; + for _ in 0..round { + let units = n_members + .into_iterator() + .map(|node_id| { + random_unit_with_parents( + node_id, + result.last().expect("previous round present"), + ) + }) + .collect(); + result.push(units); + } + result + } #[test] fn test_full_unit_hash_is_correct() { - let ch = ControlHash::::new(&vec![].into()); - let pre_unit = PreUnit::new(NodeIndex(5), 6, ch); - let full_unit = FullUnit::new(pre_unit, Some(7), 8); - let hash = full_unit.using_encoded(Hasher64::hash); - assert_eq!(full_unit.hash(), hash); - let ch = ControlHash::::new(&vec![].into()); - let pre_unit = PreUnit::new(NodeIndex(5), 6, ch); - let full_unit = FullUnit::new(pre_unit, None, 8); - let hash = full_unit.using_encoded(Hasher64::hash); - assert_eq!(full_unit.hash(), hash); + for full_unit in random_full_parent_units_up_to(3, NodeCount(4)) + .into_iter() + .flatten() + { + let hash = full_unit.using_encoded(Hasher64::hash); + assert_eq!(full_unit.hash(), hash); + } } #[test] @@ -257,19 +298,14 @@ mod tests { #[test] fn test_full_unit_codec() { - let ch = ControlHash::::new(&vec![].into()); - let pre_unit = PreUnit::new(NodeIndex(5), 6, ch); - let full_unit = FullUnit::new(pre_unit, Some(7), 8); - full_unit.hash(); - let encoded = full_unit.encode(); - let decoded = FullUnit::decode(&mut encoded.as_slice()).expect("should decode correctly"); - assert_eq!(decoded, full_unit); - let ch = ControlHash::::new(&vec![].into()); - let pre_unit = PreUnit::new(NodeIndex(5), 6, ch); - let full_unit = FullUnit::new(pre_unit, None, 8); - full_unit.hash(); - let encoded = full_unit.encode(); - let decoded = FullUnit::decode(&mut encoded.as_slice()).expect("should decode correctly"); - assert_eq!(decoded, full_unit); + for full_unit in random_full_parent_units_up_to(3, NodeCount(4)) + .into_iter() + .flatten() + { + let encoded = full_unit.encode(); + let decoded = + TestFullUnit::decode(&mut encoded.as_slice()).expect("should decode correctly"); + assert_eq!(decoded, full_unit); + } } } diff --git a/docs/src/internals.md b/docs/src/internals.md index 37d5d35a..2c2561a6 100644 --- a/docs/src/internals.md +++ b/docs/src/internals.md @@ -6,11 +6,10 @@ To explain the inner workings of AlephBFT it is instructive to follow the path o 2. The newly created unit is filled with data, session information and a signature. This is done in `src/runway.rs` which then sends it to `src/member.rs` Subsequently a recurring task of broadcasting this unit is put in the task queue. The unit will be broadcast to all other nodes a few times (with some delays in between). 3. The unit is received by another node -- happens in `src/member.rs` and immediately send to `src/runway.rs` where it passes some validation (signature checks etc.). If all these checks pass and the unit is not detected to be a fork, then it is placed in the `UnitStore` -- the `store` field of the `Runway` struct. 4. The idea is that this store keeps only **legit units** in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam). Thus no fork is ever be put there unless coming from an alert. -5. At a suitable moment the units from the store are further moved to a component called `Terminal` -- implemented in `src/terminal.rs`. -6. Roughly speaking, terminal is expected to "unpack" the unit, so that their parents become explicit (instead of being control hashes only). -7. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag. -8. Dag units are passed to a component called the `Extender` -- see the files in `src/extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). -9. Once a unit's data is placed in one of batches by the `Extender` then its path is over and can be safely discarded. +5. At a suitable moment the units from the store are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `src/reconstruction/parents.rs`. +6. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag. This is ensured by the implementation in `src/reconstruction/dag.rs`. +7. Dag units are passed to a component called the `Extender` -- see the files in `src/extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). +8. Once a unit's data is placed in one of batches by the `Extender` then its path is over and can be safely discarded. ### 5.1 Creator @@ -20,20 +19,27 @@ Since the creator does not have access to the `DataIO` object and to the `Keycha ### 5.2 Unit Store in Runway -As mentioned, the idea is that this stores only legit units and passes them to the `Terminal`. In case a fork is detected by a node `i`, all `i`'s units are attached to the appropriate alert. +As mentioned, the idea is that this stores only legit units and passes them to the reconstructing component. In case a fork is detected by a node `i`, all `i`'s units are attached to the appropriate alert. -### 5.3 Terminal +### 5.3 Reconstruction -The `Terminal` receives legit units, yet there might be two issues with such units that would not allow them to be added to Dag: +The next step is to reconstruct the structure of the Dag from the somewhat compressed information in the units. -1. A unit `U` might have a "wrong" control hash. From the perspective of the `Terminal` this means that naively picking `U`'s parents as units from previous round determined by `U.parent_map` results in a failed control hash check. Note that in case `U.creator` is honest and there are no forkers among `U` parents creators then this cannot happen and the control hash check always succeeds. Fail can happen because: - a) either `U`'s one or multiple parents are forks and the `Terminal` either does not have the correct variants yet, or just has them but performed the naive check on different variants (note that guessing the correct variants might require exponential time so there is no point for the terminal to even try it), - b) or `U`'s creator is dishonest and just put a control hash in the unit that does not "unhash" to anything meaningful. - In any case the terminal triggers a request to `Member` to download the full list of `U`'s parent hashes, so that the ambiguity is resolved. Once a correct reponse is received by `Member` then it is passed back to the terminal so that it can "decode" the parents and proceed. -2. A unit `U` might have parents that are not legit. Before adding a unit `U` to the Dag, the terminal waits for all parents of `U` to be added to the Dag first. +#### 5.3.1 Parents -There is often a situation where the terminal receives a unit `U` and for some reason there is no unit yet for a particular slot in `U`'s parents, i.e., `U`'s parent map says that one of the parents was created by node `i` but terminal has no unit with "coordinates" `(U.round - 1, i)` (`UnitCoord` type in the implementation -- means a pair consising of `(V.round, V.creator)` for some unit `V`). In such a case terminal makes a request to the `Member` to get such a unit, which is then followed by `Member` sending a series of requests to random nodes in order to fetch such a unit. +The reconstruction service receives legit units, but the information about their parents is only present as a control hash, i.e. which nodes created the parents and what was the combined hash of all the parents' hashes. Parents reconstruction remembers the first unit for any creator-round combination it encounters and optimistically uses this information to check the combined hash. If there are no dishonest nodes, which is the usual situation, then this means that every unit might at most have some parents that cannot yet be checked, because the node has not yet received them. In such a case requests for these missing units are sent to `Member`. After the units are received, the control hash check succeeds and thus the parents are reconstructed successfully. + +If dishonest nodes participate in the protocol, then two additional things can go wrong: + +1. either the unit has one or multiple parents that are forks, with variants different from the first ones received by this node to be precise. The reconstructing service might or might not have access to the correct variants, but in either case it does not attempt to perform the naive check on different variants -- guessing the correct variants might require exponential time so there is no point to even try it, +2. or the unit's creator is dishonest and just put a control hash in the unit that does not "unhash" to anything meaningful. + +In any case the reconstruction triggers a request to `Member` to download the full list of the unit's parent hashes, so that the ambiguity is resolved. Once a response is received by `Member` then it is passed back to the reconstruction so that it can "decode" the parents and proceed. + +#### 5.3.2 Dag + +The units parents might, for many reasons, not be reconstructed in an order agreeing with the Dag order, i.e. some of their ancestors might not yet be reconstructed. The Dag component ensures that units are only added to the Dag after their parents were already added, and thus any units emitted by this component are in an order agreeing with the Dag order. ### 5.4 Extender -The `Extender`'s role is to receive Dag units (from `Terminal`) and extend the output stream. Towards this end it elects the `Head` for each `round`. Such an election works by going through candidate units from this round either eliminating them or eventually electing one. Votes are computed and cached for each candidate until a decision on it is made, after which the election moves on to the next round (if elected as `Head`) or to the next unit (otherwise). After electing every `Head` the `Extender` deterministically orders all its unordered ancestors and the `Head` itself and returns the resulting batch. +The `Extender`'s role is to receive Dag units (in an order agreeing with the Dag order) and extend the output stream. Towards this end it elects the `Head` for each `round`. Such an election works by going through candidate units from this round either eliminating them or eventually electing one. Votes are computed and cached for each candidate until a decision on it is made, after which the election moves on to the next round (if elected as `Head`) or to the next unit (otherwise). After electing every `Head` the `Extender` deterministically orders all its unordered ancestors and the `Head` itself and returns the resulting batch. From d62f2f7ddfd9846d76c797d22803ab122ae093b5 Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 6 Mar 2024 11:10:52 +0100 Subject: [PATCH 2/5] Remove recurrency and other review fixes --- consensus/src/consensus.rs | 2 +- consensus/src/reconstruction/dag.rs | 37 +++++++++++++------------ consensus/src/reconstruction/mod.rs | 1 - consensus/src/reconstruction/parents.rs | 14 ++-------- 4 files changed, 23 insertions(+), 31 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 4d447bff..f3c4cc13 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -76,7 +76,7 @@ pub(crate) async fn run( futures::select! { _ = terminator.get_exit().fuse() => {}, _ = reconstruction_handle => { - debug!(target: "AlephBFT-consensus", "{:?} reconstruction task terminated early.", index); + debug!(target: "AlephBFT-consensus", "{:?} unit reconstruction task terminated early.", index); }, _ = creator_panic_handle.fuse() => { error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index); diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/reconstruction/dag.rs index f96f03b3..4c324ef3 100644 --- a/consensus/src/reconstruction/dag.rs +++ b/consensus/src/reconstruction/dag.rs @@ -1,5 +1,5 @@ use crate::{reconstruction::ReconstructedUnit, Hasher}; -use std::collections::{HashMap, HashSet}; +use std::collections::{VecDeque, HashMap, HashSet}; struct OrphanedUnit { unit: ReconstructedUnit, @@ -69,20 +69,23 @@ impl Dag { } fn move_to_dag(&mut self, unit: ReconstructedUnit) -> Vec> { - let unit_hash = unit.hash(); - self.dag_units.insert(unit_hash, unit.clone()); - // Start with this unit, so that the units are in order at the end. - let mut result = vec![unit]; - for child in self.waiting_for.remove(&unit_hash).iter().flatten() { - match self - .orphaned_units - .remove(child) - .expect("we were waiting for parents") - .resolve_parent(unit_hash) - { - Ok(unit) => result.append(&mut self.move_to_dag(unit)), - Err(orphan) => { - self.orphaned_units.insert(*child, orphan); + let mut result = Vec::new(); + let mut ready_units = VecDeque::from([unit]); + while let Some(unit) = ready_units.pop_front() { + let unit_hash = unit.hash(); + self.dag_units.insert(unit_hash, unit.clone()); + result.push(unit); + for child in self.waiting_for.remove(&unit_hash).iter().flatten() { + match self + .orphaned_units + .remove(child) + .expect("we were waiting for parents") + .resolve_parent(unit_hash) + { + Ok(unit) => ready_units.push_back(unit), + Err(orphan) => { + self.orphaned_units.insert(*child, orphan); + } } } } @@ -180,7 +183,7 @@ mod test { #[test] fn reconstructs_units_in_order() { let mut dag = Dag::new(); - for units in reconstructed(random_full_parent_units_up_to(7, NodeCount(4))) { + for units in reconstructed(random_full_parent_units_up_to(7000, NodeCount(4))) { for unit in units { let reconstructed = dag.add_unit(unit.clone()); assert_eq!(reconstructed, vec![unit]); @@ -190,7 +193,7 @@ mod test { #[test] fn reconstructs_units_in_reverse_order() { - let full_unit_dag = random_full_parent_units_up_to(7, NodeCount(4)); + let full_unit_dag = random_full_parent_units_up_to(7000, NodeCount(4)); let mut hash_batches: Vec<_> = full_unit_dag .iter() .map(unit_hashes) diff --git a/consensus/src/reconstruction/mod.rs b/consensus/src/reconstruction/mod.rs index 74b13d29..376a46d9 100644 --- a/consensus/src/reconstruction/mod.rs +++ b/consensus/src/reconstruction/mod.rs @@ -150,7 +150,6 @@ impl Service { } } Unit(unit) => { - // TODO add other destinations if self.units_for_creator.unbounded_send(unit.unit()).is_err() { warn!(target: LOG_TARGET, "Creator channel should be open."); return false; diff --git a/consensus/src/reconstruction/parents.rs b/consensus/src/reconstruction/parents.rs index 67f51d3f..d1c057b7 100644 --- a/consensus/src/reconstruction/parents.rs +++ b/consensus/src/reconstruction/parents.rs @@ -67,10 +67,7 @@ impl ReconstructingUnit { } fn control_hash(&self) -> &ControlHash { - use ReconstructingUnit::*; - match self { - Reconstructing(unit, _) | WaitingForParents(unit) => unit.control_hash(), - } + self.as_unit().control_hash() } fn as_unit(&self) -> &Unit { @@ -80,20 +77,13 @@ impl ReconstructingUnit { } } - fn into_unit(self) -> Unit { - use ReconstructingUnit::*; - match self { - Reconstructing(unit, _) | WaitingForParents(unit) => unit, - } - } - fn with_parents(self, parent_hashes: Vec) -> Result, Self> { let control_hash = self.control_hash(); let mut parents = NodeMap::with_size(control_hash.n_members()); for (parent_id, parent_hash) in control_hash.parents().zip(parent_hashes.into_iter()) { parents.insert(parent_id, parent_hash); } - ReconstructedUnit::with_parents(self.clone().into_unit(), parents).map_err(|_| self) + ReconstructedUnit::with_parents(self.as_unit().clone(), parents).map_err(|_| self) } } From 1d99d3244755d4792c8146785fe78ad2ec34690b Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 6 Mar 2024 11:12:03 +0100 Subject: [PATCH 3/5] Ofc forgot fmt... --- consensus/src/reconstruction/dag.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/reconstruction/dag.rs index 4c324ef3..ecc0497c 100644 --- a/consensus/src/reconstruction/dag.rs +++ b/consensus/src/reconstruction/dag.rs @@ -1,5 +1,5 @@ use crate::{reconstruction::ReconstructedUnit, Hasher}; -use std::collections::{VecDeque, HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; struct OrphanedUnit { unit: ReconstructedUnit, From 78ae0b23017717d1ba6bf5273997d675ca309811 Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 6 Mar 2024 16:34:25 +0100 Subject: [PATCH 4/5] Exit strategy --- consensus/src/reconstruction/mod.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/consensus/src/reconstruction/mod.rs b/consensus/src/reconstruction/mod.rs index 376a46d9..07f76537 100644 --- a/consensus/src/reconstruction/mod.rs +++ b/consensus/src/reconstruction/mod.rs @@ -180,31 +180,26 @@ impl Service { /// Run the reconstruction service until terminated. pub async fn run(mut self, mut terminator: Terminator) { - let mut exiting = false; loop { futures::select! { n = self.notifications_from_runway.next() => match n { Some(notification) => for output in self.handle_notification(notification) { if !self.handle_output(output) { - exiting = true; - break; + return; } }, None => { warn!(target: LOG_TARGET, "Notifications for reconstruction unexpectedly ended."); - exiting = true; + return; } }, _ = terminator.get_exit().fuse() => { debug!(target: LOG_TARGET, "Received exit signal."); - exiting = true; + break; } } - if exiting { - debug!(target: LOG_TARGET, "Reconstruction decided to exit."); - terminator.terminate_sync().await; - break; - } } + debug!(target: LOG_TARGET, "Reconstruction decided to exit."); + terminator.terminate_sync().await; } } From db03c11884d488dbbcdbe34084c3823c8eeef706 Mon Sep 17 00:00:00 2001 From: timorleph Date: Mon, 11 Mar 2024 12:09:01 +0100 Subject: [PATCH 5/5] Remove redundant units --- consensus/src/reconstruction/dag.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/reconstruction/dag.rs index ecc0497c..25001acb 100644 --- a/consensus/src/reconstruction/dag.rs +++ b/consensus/src/reconstruction/dag.rs @@ -55,7 +55,7 @@ impl OrphanedUnit { pub struct Dag { orphaned_units: HashMap>, waiting_for: HashMap>, - dag_units: HashMap>, + dag_units: HashSet, } impl Dag { @@ -64,7 +64,7 @@ impl Dag { Dag { orphaned_units: HashMap::new(), waiting_for: HashMap::new(), - dag_units: HashMap::new(), + dag_units: HashSet::new(), } } @@ -73,7 +73,7 @@ impl Dag { let mut ready_units = VecDeque::from([unit]); while let Some(unit) = ready_units.pop_front() { let unit_hash = unit.hash(); - self.dag_units.insert(unit_hash, unit.clone()); + self.dag_units.insert(unit_hash); result.push(unit); for child in self.waiting_for.remove(&unit_hash).iter().flatten() { match self @@ -95,14 +95,14 @@ impl Dag { /// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag, /// in an order agreeing with the Dag structure. pub fn add_unit(&mut self, unit: ReconstructedUnit) -> Vec> { - if self.dag_units.contains_key(&unit.hash()) { + if self.dag_units.contains(&unit.hash()) { // Deduplicate. return Vec::new(); } let missing_parents = unit .parents() .values() - .filter(|parent| !self.dag_units.contains_key(parent)) + .filter(|parent| !self.dag_units.contains(parent)) .cloned() .collect(); match OrphanedUnit::new(unit, missing_parents) {