Skip to content

Commit

Permalink
Implement RestreamProposal in example and test app
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Jan 14, 2025
1 parent 770f2bd commit b6e25a5
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 16 deletions.
2 changes: 1 addition & 1 deletion code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
reply_to.send(rx.await?)?;
}

HostMsg::RestreamValue {
HostMsg::RestreamProposal {
height,
round,
valid_round,
Expand Down
4 changes: 2 additions & 2 deletions code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ where

/// Requests the application to re-stream a proposal that it has already seen.
///
/// The application MUST re-publish again to its pwers all
/// The application MUST re-publish again to its peers all
/// the proposal parts pertaining to that value.
///
/// Resume with: [`resume::Continue`]
RestreamValue(
RestreamProposal(
/// Height of the value
Ctx::Height,
/// Round of the value
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where
if signed_proposal.pol_round().is_defined() {
perform!(
co,
Effect::RestreamValue(
Effect::RestreamProposal(
signed_proposal.height(),
signed_proposal.round(),
signed_proposal.pol_round(),
Expand Down
4 changes: 2 additions & 2 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,9 @@ where
Ok(r.resume_with(validator_set))
}

Effect::RestreamValue(height, round, valid_round, address, value_id, r) => {
Effect::RestreamProposal(height, round, valid_round, address, value_id, r) => {
self.host
.cast(HostMsg::RestreamValue {
.cast(HostMsg::RestreamProposal {
height,
round,
valid_round,
Expand Down
6 changes: 3 additions & 3 deletions code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ pub enum HostMsg<Ctx: Context> {
proposer: Ctx::Address,
},

/// Request to build a local block/value from Driver
/// Request to build a local value to propose
GetValue {
height: Ctx::Height,
round: Round,
timeout: Duration,
reply_to: RpcReplyPort<LocallyProposedValue<Ctx>>,
},

/// Request to restream an existing block/value from Driver
RestreamValue {
/// Request to restream an existing proposal
RestreamProposal {
height: Ctx::Height,
round: Round,
valid_round: Round,
Expand Down
6 changes: 3 additions & 3 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ impl Host {
reply_to,
} => on_get_value(state, &self.network, height, round, timeout, reply_to).await,

HostMsg::RestreamValue {
HostMsg::RestreamProposal {
height,
round,
valid_round,
address,
value_id,
} => {
on_restream_value(
on_restream_proposal(
state,
&self.network,
height,
Expand Down Expand Up @@ -373,7 +373,7 @@ async fn find_previously_built_value(
Ok(proposed_value)
}

async fn on_restream_value(
async fn on_restream_proposal(
state: &mut HostState,
network: &NetworkRef<MockContext>,
height: Height,
Expand Down
27 changes: 25 additions & 2 deletions code/crates/test/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,31 @@ pub async fn run(
}
}

AppMsg::RestreamProposal { .. } => {
error!("RestreamProposal not implemented");
AppMsg::RestreamProposal {
height,
round,
valid_round,
address,
value_id,
} => {
info!(%height, %round, %value_id, "Restreaming existing proposal...");

let Some(proposal) = state
.get_proposal(height, round, valid_round, address, value_id)
.await
else {
error!(%height, %round, %value_id, "Failed to find proposal to restream");
return Ok(());
};

for stream_message in state.stream_proposal(proposal) {
info!(%height, %round, %value_id, "Publishing proposal part: {stream_message:?}");

channels
.network
.send(NetworkMsg::PublishProposalPart(stream_message))
.await?;
}
}

AppMsg::PeerJoined { peer_id } => {
Expand Down
17 changes: 17 additions & 0 deletions code/crates/test/app/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use malachitebft_app_channel::app::types::PeerId;
use malachitebft_test::codec::proto::ProtobufCodec;
use malachitebft_test::{
Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value,
ValueId,
};

use crate::store::{DecidedValue, Store};
Expand Down Expand Up @@ -218,6 +219,22 @@ impl State {
Value::new(value)
}

pub async fn get_proposal(
&self,
height: Height,
round: Round,
_valid_round: Round,
_proposer: Address,
value_id: ValueId,
) -> Option<LocallyProposedValue<TestContext>> {
Some(LocallyProposedValue::new(
height,
round,
Value::new(value_id.as_u64()),
None,
))
}

/// Creates a new proposal value for the given height
/// Returns either a previously built proposal or creates a new one
pub async fn propose_value(
Expand Down
27 changes: 25 additions & 2 deletions code/examples/channel/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,31 @@ pub async fn run(
}
}

AppMsg::RestreamProposal { .. } => {
error!("RestreamProposal not implemented");
AppMsg::RestreamProposal {
height,
round,
valid_round,
address,
value_id,
} => {
info!(%height, %round, "Restreaming existing proposal...");

let Some(proposal) = state
.get_proposal(height, round, valid_round, address, value_id)
.await
else {
error!(%height, %round, "Failed to find proposal to restream");
return Ok(());
};

for stream_message in state.stream_proposal(proposal) {
info!(%height, %round, "Publishing proposal part: {stream_message:?}");

channels
.network
.send(NetworkMsg::PublishProposalPart(stream_message))
.await?;
}
}

AppMsg::PeerJoined { peer_id } => {
Expand Down
17 changes: 17 additions & 0 deletions code/examples/channel/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use malachitebft_app_channel::app::types::PeerId;
use malachitebft_test::codec::proto::ProtobufCodec;
use malachitebft_test::{
Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value,
ValueId,
};

use crate::store::{DecidedValue, Store};
Expand Down Expand Up @@ -300,6 +301,22 @@ impl State {

parts
}

pub async fn get_proposal(
&self,
height: Height,
round: Round,
_valid_round: Round,
_proposer: Address,
value_id: ValueId,
) -> Option<LocallyProposedValue<TestContext>> {
Some(LocallyProposedValue::new(
height,
round,
Value::new(value_id.as_u64()),
None,
))
}
}

/// Re-assemble a [`ProposedValue`] from its [`ProposalParts`].
Expand Down

0 comments on commit b6e25a5

Please sign in to comment.