Skip to content

Commit

Permalink
chore(code/consensus): Remove SyncedBlock effect and rename some in…
Browse files Browse the repository at this point in the history
…puts (#593)

* chore(code/consensus): Remove `SyncedBlock` effect, rename `ReceivedSyncedBlock` input to `CommitCertificate`

* Rename `ProposeValue` input to `Propose` to avoid ambiguity with `ProposedValue` input

---------
Co-authored-by: Anca Zamfir <[email protected]>
  • Loading branch information
romac authored Nov 22, 2024
1 parent ced48f9 commit a37d307
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 117 deletions.
1 change: 0 additions & 1 deletion code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 23 additions & 31 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use malachite_common::{
CommitCertificate, Context, Round, SignedExtension, Timeout, TimeoutStep, ValidatorSet,
};
use malachite_config::TimeoutConfig;
use malachite_consensus::{Effect, Resume};
use malachite_consensus::{Effect, Resume, ValueToPropose};
use malachite_metrics::Metrics;

use crate::block_sync::BlockSyncRef;
Expand Down Expand Up @@ -233,7 +233,13 @@ where
.process_input(
&myself,
state,
ConsensusInput::ProposeValue(height, round, Round::Nil, value, extension),
ConsensusInput::Propose(ValueToPropose {
height,
round,
valid_round: Round::Nil,
value,
extension,
}),
)
.await;

Expand Down Expand Up @@ -296,14 +302,24 @@ where
return Ok(());
};

self.host.call_and_forward(
|reply_to| HostMsg::ProcessSyncedBlock {
height: block.certificate.height,
round: block.certificate.round,
validator_address: state.consensus.address().clone(),
block_bytes: block.block_bytes.clone(),
reply_to,
},
&myself,
|proposed| Msg::<Ctx>::ReceivedProposedValue(proposed),
None,
)?;

if let Err(e) = self
.process_input(
&myself,
state,
ConsensusInput::ReceivedSyncedBlock(
block.block_bytes,
block.certificate,
),
ConsensusInput::CommitCertificate(block.certificate),
)
.await
{
Expand Down Expand Up @@ -404,7 +420,7 @@ where

Msg::ReceivedProposedValue(value) => {
let result = self
.process_input(&myself, state, ConsensusInput::ReceivedProposedValue(value))
.process_input(&myself, state, ConsensusInput::ProposedValue(value))
.await;

if let Err(e) = result {
Expand Down Expand Up @@ -601,30 +617,6 @@ where

Ok(Resume::Continue)
}

Effect::SyncedBlock {
height,
round,
validator_address,
block_bytes,
} => {
debug!(%height, "Consensus received synced block, sending to host");

self.host.call_and_forward(
|reply_to| HostMsg::ProcessSyncedBlockBytes {
height,
round,
validator_address,
block_bytes,
reply_to,
},
myself,
|proposed| Msg::<Ctx>::ReceivedProposedValue(proposed),
None,
)?;

Ok(Resume::Continue)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion code/crates/actors/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub enum HostMsg<Ctx: Context> {
},

// Synced block
ProcessSyncedBlockBytes {
ProcessSyncedBlock {
height: Ctx::Height,
round: Round,
validator_address: Ctx::Address,
Expand Down
1 change: 0 additions & 1 deletion code/crates/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ malachite-driver.workspace = true
malachite-metrics.workspace = true

async-recursion = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
genawaiter = { workspace = true }
derive-where = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
Expand Down
10 changes: 0 additions & 10 deletions code/crates/consensus/src/effect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use derive_where::derive_where;

use malachite_common::*;
Expand Down Expand Up @@ -62,15 +61,6 @@ where
/// Consensus has decided on a value
/// Resume with: [`Resume::Continue`]
Decide { certificate: CommitCertificate<Ctx> },

/// Consensus has received a synced decided block
/// Resume with: [`Resume::Continue`]
SyncedBlock {
height: Ctx::Height,
round: Round,
validator_address: Ctx::Address,
block_bytes: Bytes,
},
}

/// A value with which the consensus process can be resumed after yielding an [`Effect`].
Expand Down
34 changes: 10 additions & 24 deletions code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use crate::prelude::*;
mod decide;
mod driver;
mod proposal;
mod propose_value;
mod received_proposed_value;
mod propose;
mod proposed_value;
mod signature;
mod start_height;
mod synced_block;
mod sync;
mod timeout;
mod validator_set;
mod vote;

use proposal::on_proposal;
use propose_value::propose_value;
use received_proposed_value::on_received_proposed_value;
use propose::on_propose;
use proposed_value::on_proposed_value;
use start_height::reset_and_start_height;
use synced_block::on_received_synced_block;
use sync::on_commit_certificate;
use timeout::on_timeout_elapsed;
use vote::on_vote;

Expand Down Expand Up @@ -48,25 +48,11 @@ where
}
Input::Vote(vote) => on_vote(co, state, metrics, vote).await,
Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Input::ProposeValue(height, round, valid_round, value, extension) => {
propose_value(
co,
state,
metrics,
height,
round,
valid_round,
value,
extension,
)
.await
}
Input::Propose(value) => on_propose(co, state, metrics, value).await,
Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await,
Input::ReceivedProposedValue(value) => {
on_received_proposed_value(co, state, metrics, value).await
}
Input::ReceivedSyncedBlock(block_bytes, commits) => {
on_received_synced_block(co, state, metrics, block_bytes, commits).await
Input::ProposedValue(value) => on_proposed_value(co, state, metrics, value).await,
Input::CommitCertificate(certificate) => {
on_commit_certificate(co, state, metrics, certificate).await
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use crate::prelude::*;

use crate::handle::driver::apply_driver_input;
use crate::types::ProposedValue;
use crate::types::{ProposedValue, ValueToPropose};

#[allow(clippy::too_many_arguments)]
pub async fn propose_value<Ctx>(
pub async fn on_propose<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
height: Ctx::Height,
round: Round,
valid_round: Round,
value: Ctx::Value,
extension: Option<SignedExtension<Ctx>>,
value: ValueToPropose<Ctx>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
let ValueToPropose {
height,
round,
valid_round,
value,
extension,
} = value;

if state.driver.height() != height {
warn!(
"Ignoring proposal for height {height}, current height: {}",
Expand All @@ -41,7 +44,7 @@ where
height,
round,
valid_round,
validator_address: state.driver.address().clone(),
validator_address: state.address().clone(),
value: value.clone(),
validity: Validity::Valid,
extension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::types::ProposedValue;
id = %proposed_value.value.id()
)
)]
pub async fn on_received_proposed_value<Ctx>(
pub async fn on_proposed_value<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
Expand All @@ -31,7 +31,7 @@ where
debug!("Received value for next height, queuing for later");
state
.input_queue
.push_back(Input::ReceivedProposedValue(proposed_value));
.push_back(Input::ProposedValue(proposed_value));
}
return Ok(());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::borrow::Borrow;

use bytes::Bytes;

use crate::handle::driver::apply_driver_input;
use crate::handle::validator_set::get_validator_set;
use crate::prelude::*;

pub async fn on_received_synced_block<Ctx>(
pub async fn on_commit_certificate<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
block_bytes: Bytes,
certificate: CommitCertificate<Ctx>,
) -> Result<(), Error<Ctx>>
where
Expand All @@ -22,16 +19,6 @@ where
"Processing certificate"
);

perform!(
co,
Effect::SyncedBlock {
height: certificate.height,
round: certificate.round,
validator_address: state.driver.address().clone(),
block_bytes,
}
);

let Some(validator_set) = get_validator_set(co, state, certificate.height).await? else {
return Err(Error::ValidatorSetNotFound(certificate.height));
};
Expand Down
27 changes: 7 additions & 20 deletions code/crates/consensus/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use bytes::Bytes;
use derive_where::derive_where;

use malachite_common::{
CommitCertificate, Context, Round, SignedExtension, SignedProposal, SignedVote, Timeout,
};
use malachite_common::{CommitCertificate, Context, SignedProposal, SignedVote, Timeout};

use crate::types::ProposedValue;
use crate::ValueToPropose;

/// Inputs to be handled by the consensus process.
#[derive_where(Clone, Debug, PartialEq, Eq)]
Expand All @@ -23,25 +21,14 @@ where
Proposal(SignedProposal<Ctx>),

/// Propose a value
ProposeValue(
/// Height
Ctx::Height,
/// Round
Round,
/// Valid round
Round,
/// Value
Ctx::Value,
/// Signed vote extension
Option<SignedExtension<Ctx>>,
),
Propose(ValueToPropose<Ctx>),

/// A timeout has elapsed
TimeoutElapsed(Timeout),

/// The value corresponding to a proposal has been received
ReceivedProposedValue(ProposedValue<Ctx>),
/// Received the full proposed value corresponding to a proposal
ProposedValue(ProposedValue<Ctx>),

/// A block received via BlockSync
ReceivedSyncedBlock(Bytes, CommitCertificate<Ctx>),
/// Received a commit certificate from BlockSync
CommitCertificate(CommitCertificate<Ctx>),
}
4 changes: 4 additions & 0 deletions code/crates/consensus/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ where
}
}

pub fn address(&self) -> &Ctx::Address {
self.driver.address()
}

pub fn get_proposer(&self, height: Ctx::Height, round: Round) -> &Ctx::Address {
self.ctx
.select_proposer(self.driver.validator_set(), height, round)
Expand Down
12 changes: 11 additions & 1 deletion code/crates/consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ pub enum ConsensusMsg<Ctx: Context> {
Proposal(Ctx::Proposal),
}

/// A value to propose by the current node.
/// Used only when the node is the proposer.
#[derive_where(Clone, Debug, PartialEq, Eq)]
pub struct ValueToPropose<Ctx: Context> {
pub height: Ctx::Height,
pub round: Round,
pub valid_round: Round,
pub value: Ctx::Value,
pub extension: Option<SignedExtension<Ctx>>,
}

/// A value proposed by a validator
/// Called at non-proposer only.
#[derive_where(Clone, Debug, PartialEq, Eq)]
pub struct ProposedValue<Ctx: Context> {
pub height: Ctx::Height,
Expand Down
4 changes: 2 additions & 2 deletions code/crates/consensus/tests/full_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn val_msg(
value: u64,
validity: Validity,
) -> Input<TestContext> {
Input::ReceivedProposedValue(ProposedValue {
Input::ProposedValue(ProposedValue {
height: Height::new(1),
round: Round::new(round),
valid_round: Round::Nil,
Expand Down Expand Up @@ -275,7 +275,7 @@ fn full_proposal_keeper_tests() {
for m in s.input {
match m {
Input::Proposal(p) => keeper.store_proposal(p),
Input::ReceivedProposedValue(v) => keeper.store_value(&v),
Input::ProposedValue(v) => keeper.store_value(&v),
_ => continue,
}
}
Expand Down
2 changes: 1 addition & 1 deletion code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl Actor for StarknetHost {
Ok(())
}

HostMsg::ProcessSyncedBlockBytes {
HostMsg::ProcessSyncedBlock {
height,
round,
validator_address,
Expand Down

0 comments on commit a37d307

Please sign in to comment.