From 7dda99fdf42b70d87dda3f521eecc63746dac38c Mon Sep 17 00:00:00 2001 From: Marcin Date: Tue, 26 Nov 2024 10:21:17 +0000 Subject: [PATCH] A0-4564: Modified example/ordering so that it sends unique elements from data provider (#495) * [WIP] * Fixed bug when cache is not cleared * fmt * Review --- Cargo.lock | 2 +- examples/ordering/Cargo.toml | 2 +- examples/ordering/run.sh | 234 ++++++++++++++++++++++---------- examples/ordering/src/dataio.rs | 21 +-- examples/ordering/src/main.rs | 79 ++++++----- 5 files changed, 214 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac54c006..9b9a8fc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,7 @@ dependencies = [ [[package]] name = "aleph-bft-examples-ordering" -version = "0.0.3" +version = "0.1.0" dependencies = [ "aleph-bft", "aleph-bft-mock", diff --git a/examples/ordering/Cargo.toml b/examples/ordering/Cargo.toml index 4746f540..e35cb0a3 100644 --- a/examples/ordering/Cargo.toml +++ b/examples/ordering/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-examples-ordering" -version = "0.0.3" +version = "0.1.0" edition = "2021" authors = ["Cardinal Cryptography"] license = "Apache-2.0" diff --git a/examples/ordering/run.sh b/examples/ordering/run.sh index ef74a6aa..e703dae7 100755 --- a/examples/ordering/run.sh +++ b/examples/ordering/run.sh @@ -1,97 +1,187 @@ -#!/bin/bash +#!/usr/bin/env bash -usage() { - echo "Usage: ./run.sh [-n N_NODES] [-m N_MALFUNCTIONING_NODES] [-s N_STALLING_DATA_PROVIDERS] [-c N_CRASHES] [-o N_ORDERED_PER_CRASH] [-d RESTART_DELAY]" +set -eou pipefail + +function usage() { + cat << EOF +Usage: + This script is a demonstration usage of AlephBFT protocol, in which there are N nodes and they want to achieve + a consensus with regards to provided data. The data sent to AlephBFT from each node is a stream of integers from range + [0, N * DATA_ITEMS), where DATA_ITEMS is configurable. Each node of index 'i' sends to the consensus + integers from range [i * DATA_ITEMS; (i + 1) * DATA_ITEMS). where 0 <= i < N. At the end, each node makes + sure that it receives all integers from range [0, N * DATA_ITEMS), each integer exactly once. + + N nodes are started on your machine, and they communicate via UDP. Not all nodes behave correctly - some of them crash + or are stuck while providing data. + + This script is using aleph-bft-examples-ordering and assumes to be available in a relative folder from this script path + ../../target/release/aleph-bft-examples-ordering + + $0 + [-n|--nodes NODES] + number of all non-crashing nodes; some of them can have stalled data provider + [-c|--crashing-nodes CRASHING_NODES] + number of nodes that crash while providing data + [-s|--stalling-data-providers STALLING_DATA_PROVIDERS] + number of nodes that eventually stall while providing data; must be less than --nodes + [--crashes-count CRASHES_COUNT] + how many times a crashing node should crash + [--data-items DATA_ITEMS] + how many data items each node should order + [--crash-restart-delay-seconds CRASH_RESTART_DELAY_SECONDS] + delay (seconds) between subsequent node crashes + [--unit-creation-delay UNIT_CREATION_DELAY] + unit creation delay (milliseconds), default 200 +EOF + exit 0 +} + +NORMAL=$(tput sgr0) +GREEN=$(tput setaf 2; tput bold) +YELLOW=$(tput setaf 3) +RED=$(tput setaf 1) + +function get_timestamp() { + echo "$(date +'%Y-%m-%d %T:%3N')" +} + +function error() { + echo -e "$(get_timestamp) $RED$*$NORMAL" exit 1 } -N_NODES=2 -N_MALFUNCTIONING_NODES=2 -N_STALLING_DATA_PROVIDERS=1 -N_CRASHES=3 -N_ORDERED_PER_CRASH=25 -RESTART_DELAY=1 - -while getopts :n:m:s:c:o:d: flag; do - case "${flag}" in - n) N_NODES=${OPTARG};; - m) N_MALFUNCTIONING_NODES=${OPTARG};; - s) N_STALLING_DATA_PROVIDERS=${OPTARG};; - c) N_CRASHES=${OPTARG};; - o) N_ORDERED_PER_CRASH=${OPTARG};; - d) RESTART_DELAY=${OPTARG};; - *) usage;; - esac -done +function info() { + echo -e "$(get_timestamp) $GREEN$*$NORMAL" +} -n_ordered=$(( (N_CRASHES+1)*N_ORDERED_PER_CRASH )) -stalled=$(seq -s, 0 $((N_STALLING_DATA_PROVIDERS-1))) -port=10000 -ports="$port" -for i in $(seq 0 $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 2)); do - port=$((port+1)) - ports+=",$port" -done +function warning() { + echo -e "$(get_timestamp) $YELLOW$*$NORMAL" +} -set -e +function run_ordering_binary() { + local id="$1" + local starting_data_item="$2" + local data_items=$3 + local should_stall="${4:-no}" -cargo build --release -binary="../../target/release/aleph-bft-examples-ordering" + local binary_args=( + --id "$id" + --ports "${PORTS}" + --starting-data-item "${starting_data_item}" + --data-items "${data_items}" + --required-finalization-value "${EXPECTED_FINALIZED_DATA_ITEMS}" + --unit-creation-delay "${UNIT_CREATION_DELAY}" + ) + if [[ "${should_stall}" == "yes-stall" ]]; then + binary_args+=(--should-stall) + fi -clear + info "Starting node ${id} to provide items from ${starting_data_item} to $(( starting_data_item + data_items - 1 )), inclusive" + "${ordering_binary}" "${binary_args[@]}" 2>> "node${id}.log" > /dev/null & +} -run_crash_node () { +function run_crash_node () { id="$1" - n_starting=0 - n_data=$N_ORDERED_PER_CRASH - for (( i = 1; i <= N_CRASHES; i++ )); do - echo "Starting node $id at $n_starting items ($i/$((N_CRASHES+1)))..." - ! "$binary" --id "$id" --ports "$ports" --n-data "$n_data" --n-starting "$n_starting" --stalled "$stalled" --crash 2>> "node${id}.log" - echo "Node $id crashed. Respawning in $RESTART_DELAY seconds..." - sleep "$RESTART_DELAY" - n_starting=$n_data - n_data=$(( n_data + N_ORDERED_PER_CRASH )) + for run_attempt_index in $(seq 0 $(( CRASHES_COUNT - 1 ))); do + run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}" + pid=$! + info "Waiting ${CRASH_RESTART_DELAY_SECONDS} seconds..." + sleep "${CRASH_RESTART_DELAY_SECONDS}" + info "Killing node with pid ${pid}" + kill -9 "${pid}" 2> /dev/null done - echo "Starting node $id at $n_starting items ($((N_CRASHES+1))/$((N_CRASHES+1)))..." - "$binary" --id "$id" --ports "$ports" --n-data "$n_data" --n-starting "$n_starting" --stalled "$stalled" 2>> "node${id}.log" + run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}" } -for id in $(seq 0 $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 1)); do - rm -f "aleph-bft-examples-ordering-backup/${id}.units" - rm -f "node${id}.log" +NODES=2 +CRASHING_NODES=2 +STALLING_DATA_PROVIDERS=1 +CRASHES_COUNT=3 +DATA_ITEMS=25 +CRASH_RESTART_DELAY_SECONDS=5 +DATA_ITEMS_COUNTER=0 +UNIT_CREATION_DELAY=200 + +while [[ $# -gt 0 ]]; do + case "$1" in + -n|--nodes) + NODES="$2" + shift;shift + ;; + -c|--crashing-nodes) + CRASHING_NODES="$2" + shift;shift + ;; + -s|--stalling-data-providers) + STALLING_DATA_PROVIDERS="$2" + shift;shift + ;; + --crashes-count) + CRASHES_COUNT="$2" + shift;shift + ;; + --data-items) + DATA_ITEMS="$2" + shift;shift + ;; + --crash-restart-delay-seconds) + CRASH_RESTART_DELAY_SECONDS="$2" + shift;shift + ;; + --unit-creation-delay) + UNIT_CREATION_DELAY="$2" + shift;shift + ;; + --help) + usage + shift + ;; + *) + error "Unrecognized argument $1!" + ;; + esac done +script_path="${BASH_SOURCE[0]}" +script_dir=$(dirname "${script_path}") +ordering_binary_dir=$(realpath "${script_dir}/../../") +ordering_binary="${ordering_binary_dir}/target/release/aleph-bft-examples-ordering" -echo "WARNING -The current implementation of AlephBFT does not strictly guarantee -all input data to be included in the output - a property that will -be added in a future version. This issue occurs when the provider lags -behind other nodes. -Therefore, always check logs to see if there are any unexpected -errors - e.g. connection timeout - or if some crashed nodes are lagging -behind - messages \"Providing None\" are logged, but the total -amount of finalized data does not increase for this particular node. -In such case, try reducing the number of nodes or shortening -the restart delay. Another option is to make more than 1/3 of the nodes -malfunctioning - with that the protocol will stall while the nodes are -restarting so that there won't be a set of nodes that run out ahead. +if [[ ! -x "${ordering_binary}" ]]; then + error "${ordering_binary} does not exist or it's not an executable file!" +fi + +ALL_NODES=$(( NODES + CRASHING_NODES )) +PORTS=($(seq -s , 10000 $(( 10000 + ALL_NODES - 1 )))) +EXPECTED_FINALIZED_DATA_ITEMS=$(( ALL_NODES * DATA_ITEMS )) + +for id in $(seq 0 $(( ALL_NODES - 1 ))); do + rm -f "aleph-bft-examples-ordering-backup/${id}.units" + rm -f "node${id}.log" +done +info "Starting $0 PARAMETERS -number of nodes: $N_NODES -number of malfunctioning nodes: $N_MALFUNCTIONING_NODES -number of nodes with stalling DataProviders: $N_STALLING_DATA_PROVIDERS -number of forced crashes: $N_CRASHES -number of ordered data per crash: $N_ORDERED_PER_CRASH -restart delay: $RESTART_DELAY second(s) +number of nodes: ${NODES} +number of crashing nodes: ${CRASHING_NODES} +number of nodes with stalling DataProviders: ${STALLING_DATA_PROVIDERS} +number of forced crashes: ${CRASHES_COUNT} +number of ordered data per batch: ${DATA_ITEMS} +restart delay: ${CRASH_RESTART_DELAY_SECONDS} second(s) " -for id in $(seq 0 $(expr $N_NODES - 1)); do - echo "Starting node ${id}..." - "$binary" --id "$id" --ports "$ports" --n-data "$n_ordered" --stalled "$stalled" 2>> "node${id}.log" & +for id in $(seq 0 $(( NODES - 1 ))); do + if [[ "${id}" -lt "${STALLING_DATA_PROVIDERS}" ]]; then + run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}" "yes-stall" + else + run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}" + fi + DATA_ITEMS_COUNTER=$(( DATA_ITEMS_COUNTER + DATA_ITEMS )) done -for i in $(seq $(expr $N_NODES) $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 1)); do - run_crash_node "$i" & +for id in $(seq $(( NODES )) $(( ALL_NODES - 1 ))); do + run_crash_node "${id}" & + DATA_ITEMS_COUNTER=$(( DATA_ITEMS_COUNTER + DATA_ITEMS )) done trap 'kill $(jobs -p); wait' SIGINT SIGTERM diff --git a/examples/ordering/src/dataio.rs b/examples/ordering/src/dataio.rs index c136ee5b..f6b0e570 100644 --- a/examples/ordering/src/dataio.rs +++ b/examples/ordering/src/dataio.rs @@ -14,17 +14,19 @@ pub type Data = (NodeIndex, u32); #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Default, Decode, Encode)] pub struct DataProvider { id: NodeIndex, - counter: u32, - n_data: u32, + starting_data_item: u32, + data_items: u32, + current_data: u32, stalled: bool, } impl DataProvider { - pub fn new(id: NodeIndex, counter: u32, n_data: u32, stalled: bool) -> Self { + pub fn new(id: NodeIndex, starting_data_item: u32, data_items: u32, stalled: bool) -> Self { Self { id, - counter, - n_data, + starting_data_item, + current_data: starting_data_item, + data_items, stalled, } } @@ -35,7 +37,7 @@ impl DataProviderT for DataProvider { type Output = Data; async fn get_data(&mut self) -> Option { - if self.n_data == 0 { + if self.starting_data_item + self.data_items == self.current_data { if self.stalled { info!("Awaiting DataProvider::get_data forever"); pending::<()>().await; @@ -43,10 +45,9 @@ impl DataProviderT for DataProvider { info!("Providing None"); None } else { - let data = (self.id, self.counter); - info!("Providing data: {}", self.counter); - self.counter += 1; - self.n_data -= 1; + let data = (self.id, self.current_data); + info!("Providing data: {}", self.current_data); + self.current_data += 1; Some(data) } } diff --git a/examples/ordering/src/main.rs b/examples/ordering/src/main.rs index 0cab2992..5b4e3ac4 100644 --- a/examples/ordering/src/main.rs +++ b/examples/ordering/src/main.rs @@ -2,14 +2,15 @@ use std::io::Write; mod dataio; mod network; -use aleph_bft::{run_session, NodeIndex, Terminator}; +use aleph_bft::{default_delay_config, run_session, NodeIndex, Terminator}; use aleph_bft_mock::{Keychain, Spawner}; use clap::Parser; use dataio::{Data, DataProvider, FinalizationHandler}; use futures::{channel::oneshot, io, StreamExt}; use log::{debug, error, info}; use network::Network; -use std::{collections::HashMap, path::Path, time::Duration}; +use std::sync::Arc; +use std::{path::Path, time::Duration}; use time::{macros::format_description, OffsetDateTime}; use tokio::fs::{self, File}; use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt}; @@ -28,19 +29,24 @@ struct Args { /// Number of items to be ordered #[clap(long, value_parser)] - n_data: u32, + data_items: u32, /// Number of the first created item - #[clap(default_value = "0", long, value_parser)] - n_starting: u32, + #[clap(long, value_parser)] + starting_data_item: u32, - /// Indices of nodes having stalling DataProviders - #[clap(default_value = "", long, value_parser, value_delimiter = ',')] - stalled: Vec, + /// Should the node stall after providing all its items + #[clap(long, value_parser)] + should_stall: bool, - /// Should the node crash after finalizing its items + /// Value which denotes range of integers that must be seen as finalized from all nodes + /// ie all nodes must finalize integer sequence [0; required_finalization_value) #[clap(long, value_parser)] - crash: bool, + required_finalization_value: u32, + + /// Unit creation delay (milliseconds) + #[clap(long, default_value = "200", value_parser)] + unit_creation_delay: u64, } async fn create_backup( @@ -62,15 +68,6 @@ async fn create_backup( Ok((saver.compat_write(), loader)) } -fn finalized_counts(cf: &HashMap) -> Vec { - let mut v = cf - .iter() - .map(|(id, n)| (id.0, n)) - .collect::>(); - v.sort(); - v.iter().map(|(_, n)| **n).collect() -} - #[tokio::main] async fn main() { let time_format = @@ -95,12 +92,13 @@ async fn main() { let Args { id, ports, - n_data, - n_starting, - stalled, - crash, + data_items, + starting_data_item, + should_stall, + required_finalization_value, + unit_creation_delay, } = Args::parse(); - let stalled = stalled.contains(&id); + let id: NodeIndex = id.into(); info!("Getting network up."); @@ -108,7 +106,7 @@ async fn main() { .await .expect("Could not create a Network instance."); let n_members = ports.len().into(); - let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled); + let data_provider = DataProvider::new(id, starting_data_item, data_items, should_stall); let (finalization_handler, mut finalized_rx) = FinalizationHandler::new(); let (backup_saver, backup_loader) = create_backup(id) .await @@ -122,9 +120,12 @@ async fn main() { let (exit_tx, exit_rx) = oneshot::channel(); let member_terminator = Terminator::create_root(exit_rx, "AlephBFT-member"); + let mut delay_config = default_delay_config(); + delay_config.unit_creation_delay = + Arc::new(move |_| Duration::from_millis(unit_creation_delay)); let member_handle = tokio::spawn(async move { let keychain = Keychain::new(n_members, id); - let config = aleph_bft::default_config(n_members, id, 0, 5000, Duration::ZERO) + let config = aleph_bft::create_config(n_members, id, 0, 5000, delay_config, Duration::ZERO) .expect("Should always succeed with Duration::ZERO"); run_session( config, @@ -137,36 +138,34 @@ async fn main() { .await }); - let mut count_finalized: HashMap = - (0..ports.len()).map(|c| (c.into(), 0)).collect(); + let node_count = ports.len(); + let mut count_finalized = vec![0; node_count]; + + let mut finalized_items = vec![0; required_finalization_value as usize]; loop { match finalized_rx.next().await { Some((id, number)) => { - *count_finalized.get_mut(&id).unwrap() += 1; + count_finalized[id.0] += 1; + finalized_items[number as usize] += 1; debug!( "Finalized new item: node {:?}, number {:?}; total: {:?}", - id.0, - number, - finalized_counts(&count_finalized) + id.0, number, &count_finalized, ); } None => { error!( "Finalization stream finished too soon. Got {:?} items, wanted {:?} items", - finalized_counts(&count_finalized), - n_data + &count_finalized, data_items ); panic!("Finalization stream finished too soon."); } } - if crash && count_finalized.get(&id).unwrap() >= &(n_data) { - panic!( - "Forced crash - items finalized so far: {:?}.", - finalized_counts(&count_finalized) + if finalized_items.iter().all(|item| *item >= 1) { + info!( + "Finalized all items from 0 to {}, at least once.", + required_finalization_value - 1 ); - } else if count_finalized.values().all(|c| c >= &(n_data)) { - info!("Finalized required number of items."); info!("Waiting 10 seconds for other nodes..."); tokio::time::sleep(Duration::from_secs(10)).await; info!("Shutdown.");