Skip to content

Commit

Permalink
Merge branch 'main' into romac/broadcast-status
Browse files Browse the repository at this point in the history
  • Loading branch information
romac authored Nov 26, 2024
2 parents 4469385 + 29311f2 commit 63b4413
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 252 deletions.
13 changes: 4 additions & 9 deletions code/crates/consensus/src/handle/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,15 @@ where
// Drop all others.
if state.driver.round() == Round::Nil {
debug!("Received proposal at round -1, queuing for later");
state
.input_queue
.push_back(Input::Proposal(signed_proposal));
state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal));

return Ok(());
}

if proposal_height > consensus_height {
if consensus_height.increment() == proposal_height {
debug!("Received proposal for next height, queuing for later");
debug!("Received proposal for higher height, queuing for later");
state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal));

state
.input_queue
.push_back(Input::Proposal(signed_proposal));
}
return Ok(());
}

Expand Down
13 changes: 7 additions & 6 deletions code/crates/consensus/src/handle/proposed_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ where
}

if state.driver.height() < proposed_value.height {
if state.driver.height().increment() == proposed_value.height {
debug!("Received value for next height, queuing for later");
state
.input_queue
.push_back(Input::ProposedValue(proposed_value, origin));
}
debug!("Received value for higher height, queuing for later");

state.buffer_input(
proposed_value.height,
Input::ProposedValue(proposed_value, origin),
);

return Ok(());
}

Expand Down
21 changes: 10 additions & 11 deletions code/crates/consensus/src/handle/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ where
"Received vote at round -1, queuing for later"
);

state.input_queue.push_back(Input::Vote(signed_vote));
state.buffer_input(vote_height, Input::Vote(signed_vote));
return Ok(());
}

if consensus_height < vote_height {
if consensus_height.increment() == vote_height {
debug!(
consensus.height = %consensus_height,
vote.height = %vote_height,
validator = %validator_address,
"Received vote for next height, queuing for later"
);

state.input_queue.push_back(Input::Vote(signed_vote));
}
debug!(
consensus.height = %consensus_height,
vote.height = %vote_height,
validator = %validator_address,
"Received vote for higher height, queuing for later"
);

state.buffer_input(vote_height, Input::Vote(signed_vote));

return Ok(());
}

Expand Down
19 changes: 11 additions & 8 deletions code/crates/consensus/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use tracing::debug;
use std::collections::{BTreeMap, BTreeSet};
use tracing::{debug, warn};

use malachite_common::*;
use malachite_driver::Driver;
use tracing::warn;

use crate::input::Input;
use crate::{FullProposal, FullProposalKeeper};
use crate::{Params, ProposedValue};
use crate::util::max_queue::MaxQueue;
use crate::{FullProposal, FullProposalKeeper, Params, ProposedValue};

/// The state maintained by consensus for processing a [`Input`][crate::Input].
pub struct State<Ctx>
Expand All @@ -23,9 +22,8 @@ where
/// Driver for the per-round consensus state machine
pub driver: Driver<Ctx>,

/// A queue of inputs that were received before the
/// driver started the new height.
pub input_queue: VecDeque<Input<Ctx>>,
/// A queue of inputs that were received before the driver started.
pub input_queue: MaxQueue<Ctx::Height, Input<Ctx>>,

/// The proposals to decide on.
pub full_proposal_keeper: FullProposalKeeper<Ctx>,
Expand Down Expand Up @@ -151,6 +149,11 @@ where
self.full_proposal_keeper.remove_full_proposals(height)
}

/// Queue an input for later processing, only keep inputs for the highest height seen so far.
pub fn buffer_input(&mut self, height: Ctx::Height, input: Input<Ctx>) {
self.input_queue.push(height, input);
}

pub fn print_state(&self) {
if let Some(per_round) = self.driver.votes().per_round(self.driver.round()) {
warn!(
Expand Down
163 changes: 163 additions & 0 deletions code/crates/consensus/src/util/max_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/// A data structure that maintains a queue of values associated with monotonically increasing indices,
/// retaining only those values associated with the maximum index seen so far.
///
/// # Type Parameters
/// - `I`: The type of the index associated with each value in the queue.
/// - `T`: The type of values stored in the queue.
///
/// # Invariant
/// - All values in the queue are associated with the maximum index observed so far.
#[derive(Clone, Debug)]
pub struct MaxQueue<I, T> {
/// The highest index observed, which determines the values retained in the queue.
highest_index: I,

/// A vector storing the values associated with the maximum index.
/// Values are appended in the order they are pushed.
queue: Vec<T>,
}

impl<I, T> Default for MaxQueue<I, T>
where
I: Default,
{
/// Creates a `MaxQueue` with the default index value and an empty queue.
///
/// # Returns
/// - A `MaxQueue` instance with `current` initialized to the default value of `I` and an empty `queue`.
fn default() -> Self {
Self {
highest_index: Default::default(),
queue: Default::default(),
}
}
}

impl<I, T> MaxQueue<I, T> {
/// Constructs a new, empty `MaxQueue` with its index set to default.
///
/// # Returns
/// - A new `MaxQueue` with default `current` index and an empty queue.
pub fn new() -> Self
where
I: Default,
{
Self::default()
}

/// Pushes a value into the queue with an associated index.
///
/// - If the `index` is greater than the highest index seen so far, the queue is cleared,
/// the highest index seen so far is updated, and the value is added.
/// - If the `index` is equal to the highest index seen so far, the value is appended to the queue.
/// - If the `index` is less than the highest index seen so far, the value is ignored.
///
/// # Arguments
/// - `index`: The index associated with the value.
/// - `value`: The value to be stored in the queue.
///
/// # Returns
/// - Whether or not the value was inserted into the queue.
#[allow(clippy::comparison_chain)]
pub fn push(&mut self, index: I, value: T) -> bool
where
I: Ord,
{
if index > self.highest_index {
// New highest index, clear the queue, insert the new value
self.highest_index = index;
self.queue.clear();
self.queue.push(value);
true
} else if index == self.highest_index {
// Same index, insert the new value
self.queue.push(value);
true
} else {
// Smaller index, ignore the value
false
}
}

/// Returns an iterator over references to the values in the queue.
///
/// # Returns
/// - An iterator producing references to each value stored in the queue in order of insertion.
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.queue.iter()
}

/// Returns how many values are stored in queue.
pub fn len(&self) -> usize {
self.queue.len()
}

/// Returns whether the queue is empty.
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}

/// Returns the queue.
pub fn into_vec(self) -> Vec<T> {
self.queue
}

/// Returns a clone of the queue.
pub fn to_vec(&self) -> Vec<T>
where
T: Clone,
{
self.queue.to_vec()
}
}

/// Consumes the `MaxQueue` and returns an iterator that yields its values.
///
/// # Returns
/// - An iterator over values in the queue.
impl<I, T> IntoIterator for MaxQueue<I, T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;

fn into_iter(self) -> Self::IntoIter {
self.queue.into_iter()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_max_queue() {
let mut queue = MaxQueue::new();

assert!(queue.is_empty());
assert_eq!(queue.len(), 0);

assert!(queue.push(1, "one"));
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
assert_eq!(queue.to_vec(), vec!["one"]);

assert!(queue.push(2, "two"));
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
assert_eq!(queue.to_vec(), vec!["two"]);

assert!(!queue.push(1, "one again"));
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
assert_eq!(queue.to_vec(), vec!["two"]);

assert!(queue.push(2, "two again"));
assert_eq!(queue.len(), 2);
assert!(!queue.is_empty());
assert_eq!(queue.to_vec(), vec!["two", "two again"]);

assert!(queue.push(3, "three"));
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
assert_eq!(queue.to_vec(), vec!["three"]);
}
}
1 change: 1 addition & 0 deletions code/crates/consensus/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod max_queue;
pub mod pretty;
60 changes: 50 additions & 10 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ impl HostState {
stream_id
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_block_from_parts(
&self,
parts: &[Arc<ProposalPart>],
height: Height,
round: Round,
) -> Option<(ProposedValue<MockContext>, Block)> {
let value = self.build_value_from_parts(parts, height, round).await?;

let txes = parts
.iter()
.filter_map(|part| part.as_transactions())
.flat_map(|txes| txes.to_vec())
.collect::<Vec<_>>();

let block = Block {
height,
transactions: Transactions::new(txes),
block_hash: value.value,
};

Some((value, block))
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_value_from_parts(
&self,
Expand Down Expand Up @@ -293,7 +317,7 @@ impl StarknetHost {
match state.block_store.prune(retain_height).await {
Ok(pruned) => {
debug!(
%retain_height, pruned = pruned.iter().join(", "),
%retain_height, pruned_heights = pruned.iter().join(", "),
"Pruned the block store"
);
}
Expand Down Expand Up @@ -371,6 +395,7 @@ impl Actor for StarknetHost {

while let Some(part) = rx_part.recv().await {
state.host.part_store.store(height, round, part.clone());

if state.host.params.value_payload.include_parts() {
debug!(%stream_id, %sequence, "Broadcasting proposal part");

Expand Down Expand Up @@ -404,17 +429,28 @@ impl Actor for StarknetHost {

let parts = state.host.part_store.all_parts(height, round);

let extension = state.host.generate_vote_extension(height, round);
let Some((value, block)) =
state.build_block_from_parts(&parts, height, round).await
else {
error!(%height, %round, "Failed to build block from parts");
return Ok(());
};

if let Some(value) = state.build_value_from_parts(&parts, height, round).await {
reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
extension,
))?;
if let Err(e) = state
.block_store
.store_undecided_block(value.height, value.round, block)
.await
{
error!(%e, %height, %round, "Failed to store the proposed block");
}

reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
value.extension,
))?;

Ok(())
}

Expand Down Expand Up @@ -545,7 +581,11 @@ impl Actor for StarknetHost {
}

// Build the block from transaction parts and certificate, and store it
if let Err(e) = state.block_store.store(&certificate, &all_txes).await {
if let Err(e) = state
.block_store
.store_decided_block(&certificate, &all_txes)
.await
{
error!(%e, %height, %round, "Failed to store the block");
}

Expand Down
Loading

0 comments on commit 63b4413

Please sign in to comment.