Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into romac/input-queue-bou…
Browse files Browse the repository at this point in the history
…nd-size
  • Loading branch information
romac committed Nov 25, 2024
2 parents 089bfab + d68bb79 commit fc3ef84
Show file tree
Hide file tree
Showing 22 changed files with 529 additions and 253 deletions.
13 changes: 8 additions & 5 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{debug, error, info, warn};
use malachite_blocksync as blocksync;
use malachite_common::{
CommitCertificate, Context, Round, SignedExtension, Timeout, TimeoutStep, ValidatorSet,
ValueOrigin,
};
use malachite_config::TimeoutConfig;
use malachite_consensus::{Effect, Resume, ValueToPropose};
Expand Down Expand Up @@ -62,7 +63,7 @@ pub enum Msg<Ctx: Context> {
ProposeValue(Ctx::Height, Round, Ctx::Value, Option<SignedExtension<Ctx>>),

/// Received and assembled the full value proposed by a validator
ReceivedProposedValue(ProposedValue<Ctx>),
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),

/// Get the status of the consensus state machine
GetStatus(RpcReplyPort<Status<Ctx>>),
Expand Down Expand Up @@ -311,7 +312,9 @@ where
reply_to,
},
&myself,
|proposed| Msg::<Ctx>::ReceivedProposedValue(proposed),
|proposed| {
Msg::<Ctx>::ReceivedProposedValue(proposed, ValueOrigin::BlockSync)
},
None,
)?;

Expand Down Expand Up @@ -379,7 +382,7 @@ where
reply_to,
},
&myself,
|value| Msg::ReceivedProposedValue(value),
|value| Msg::ReceivedProposedValue(value, ValueOrigin::Consensus),
None,
)
.map_err(|e| {
Expand Down Expand Up @@ -418,9 +421,9 @@ where
Ok(())
}

Msg::ReceivedProposedValue(value) => {
Msg::ReceivedProposedValue(value, origin) => {
let result = self
.process_input(&myself, state, ConsensusInput::ProposedValue(value))
.process_input(&myself, state, ConsensusInput::ProposedValue(value, origin))
.await;

if let Err(e) = result {
Expand Down
2 changes: 1 addition & 1 deletion code/crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ pub use signing::SigningScheme;
pub use threshold::{Threshold, ThresholdParam, ThresholdParams};
pub use timeout::{Timeout, TimeoutStep};
pub use validator_set::{Address, Validator, ValidatorSet, VotingPower};
pub use value::{NilOrVal, Value};
pub use value::{NilOrVal, Value, ValueOrigin};
pub use vote::{Extension, Vote, VoteType};
9 changes: 9 additions & 0 deletions code/crates/common/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,12 @@ where
/// The ID of the value.
fn id(&self) -> Self::Id;
}

/// Protocols that diseminate `Value`
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ValueOrigin {
/// Block Synchronization protocol
BlockSync,
/// Consensus protocol
Consensus,
}
4 changes: 3 additions & 1 deletion code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ where
Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Input::Propose(value) => on_propose(co, state, metrics, value).await,
Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await,
Input::ProposedValue(value) => on_proposed_value(co, state, metrics, value).await,
Input::ProposedValue(value, origin) => {
on_proposed_value(co, state, metrics, value, origin).await
}
Input::CommitCertificate(certificate) => {
on_commit_certificate(co, state, metrics, certificate).await
}
Expand Down
1 change: 0 additions & 1 deletion code/crates/consensus/src/handle/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ where
"Received proposal from a non-proposer"
);

// TODO - why when we replay proposals the proposer is wrong
return Ok(false);
};

Expand Down
15 changes: 11 additions & 4 deletions code/crates/consensus/src/handle/proposed_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn on_proposed_value<Ctx>(
state: &mut State<Ctx>,
metrics: &Metrics,
proposed_value: ProposedValue<Ctx>,
origin: ValueOrigin,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
Expand All @@ -27,15 +28,21 @@ where
}

if state.driver.height() < proposed_value.height {
debug!("Received value for next height, queuing for later");
state.buffer_input(proposed_value.height, Input::ProposedValue(proposed_value));
debug!("Received value for higher height, queuing for later");

state.buffer_input(
proposed_value.height,
Input::ProposedValue(proposed_value, origin),
);

return Ok(());
}

state.store_value(&proposed_value);

if state.params.value_payload.parts_only() {
// There are two cases where we need to generate an internal Proposal message for consensus to process the full proposal:
// a) In parts-only mode, where we do not get a Proposal message but only the proposal parts
// b) In any mode if the proposed value was provided by BlockSync, where we do net get a Proposal message but only the full value and the certificate
if state.params.value_payload.parts_only() || origin == ValueOrigin::BlockSync {
let proposal = Ctx::new_proposal(
proposed_value.height,
proposed_value.round,
Expand Down
9 changes: 6 additions & 3 deletions code/crates/consensus/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use derive_where::derive_where;

use malachite_common::{CommitCertificate, Context, SignedProposal, SignedVote, Timeout};
use malachite_common::{
CommitCertificate, Context, SignedProposal, SignedVote, Timeout, ValueOrigin,
};

use crate::types::ProposedValue;
use crate::ValueToPropose;
Expand All @@ -26,8 +28,9 @@ where
/// A timeout has elapsed
TimeoutElapsed(Timeout),

/// Received the full proposed value corresponding to a proposal
ProposedValue(ProposedValue<Ctx>),
/// Received the full proposed value corresponding to a proposal.
/// The origin denotes whether the value was received via consensus or BlockSync.
ProposedValue(ProposedValue<Ctx>, ValueOrigin),

/// Received a commit certificate from BlockSync
CommitCertificate(CommitCertificate<Ctx>),
Expand Down
25 changes: 14 additions & 11 deletions code/crates/consensus/tests/full_proposal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use malachite_common::{Context, Round, SignedProposal, Validity};
use malachite_common::{Context, Round, SignedProposal, Validity, ValueOrigin};
use malachite_consensus::{FullProposal, FullProposalKeeper, Input, ProposedValue};
use malachite_test::utils::validators::make_validators;
use malachite_test::{Address, Proposal, Value};
Expand Down Expand Up @@ -66,15 +66,18 @@ fn val_msg(
value: u64,
validity: Validity,
) -> Input<TestContext> {
Input::ProposedValue(ProposedValue {
height: Height::new(1),
round: Round::new(round),
valid_round: Round::Nil,
value: Value::new(value),
validity,
validator_address,
extension: Default::default(),
})
Input::ProposedValue(
ProposedValue {
height: Height::new(1),
round: Round::new(round),
valid_round: Round::Nil,
value: Value::new(value),
validity,
validator_address,
extension: Default::default(),
},
ValueOrigin::Consensus,
)
}

fn prop_at_round_and_value(
Expand Down Expand Up @@ -275,7 +278,7 @@ fn full_proposal_keeper_tests() {
for m in s.input {
match m {
Input::Proposal(p) => keeper.store_proposal(p),
Input::ProposedValue(v) => keeper.store_value(&v),
Input::ProposedValue(v, _) => keeper.store_value(&v),
_ => continue,
}
}
Expand Down
60 changes: 50 additions & 10 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ impl HostState {
stream_id
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_block_from_parts(
&self,
parts: &[Arc<ProposalPart>],
height: Height,
round: Round,
) -> Option<(ProposedValue<MockContext>, Block)> {
let value = self.build_value_from_parts(parts, height, round).await?;

let txes = parts
.iter()
.filter_map(|part| part.as_transactions())
.flat_map(|txes| txes.to_vec())
.collect::<Vec<_>>();

let block = Block {
height,
transactions: Transactions::new(txes),
block_hash: value.value,
};

Some((value, block))
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_value_from_parts(
&self,
Expand Down Expand Up @@ -293,7 +317,7 @@ impl StarknetHost {
match state.block_store.prune(retain_height).await {
Ok(pruned) => {
debug!(
%retain_height, pruned = pruned.iter().join(", "),
%retain_height, pruned_heights = pruned.iter().join(", "),
"Pruned the block store"
);
}
Expand Down Expand Up @@ -371,6 +395,7 @@ impl Actor for StarknetHost {

while let Some(part) = rx_part.recv().await {
state.host.part_store.store(height, round, part.clone());

if state.host.params.value_payload.include_parts() {
debug!(%stream_id, %sequence, "Broadcasting proposal part");

Expand Down Expand Up @@ -404,17 +429,28 @@ impl Actor for StarknetHost {

let parts = state.host.part_store.all_parts(height, round);

let extension = state.host.generate_vote_extension(height, round);
let Some((value, block)) =
state.build_block_from_parts(&parts, height, round).await
else {
error!(%height, %round, "Failed to build block from parts");
return Ok(());
};

if let Some(value) = state.build_value_from_parts(&parts, height, round).await {
reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
extension,
))?;
if let Err(e) = state
.block_store
.store_undecided_block(value.height, value.round, block)
.await
{
error!(%e, %height, %round, "Failed to store the proposed block");
}

reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
value.extension,
))?;

Ok(())
}

Expand Down Expand Up @@ -545,7 +581,11 @@ impl Actor for StarknetHost {
}

// Build the block from transaction parts and certificate, and store it
if let Err(e) = state.block_store.store(&certificate, &all_txes).await {
if let Err(e) = state
.block_store
.store_decided_block(&certificate, &all_txes)
.await
{
error!(%e, %height, %round, "Failed to store the block");
}

Expand Down
Loading

0 comments on commit fc3ef84

Please sign in to comment.