Skip to content

Commit

Permalink
fix(code/blocksync): Broadcast BlockSync status instead of gossiping …
Browse files Browse the repository at this point in the history
…it (#611)
  • Loading branch information
romac authored Nov 26, 2024
1 parent 29311f2 commit 95ad838
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 98 deletions.
17 changes: 11 additions & 6 deletions code/crates/actors/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use libp2p::PeerId;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use rand::SeedableRng;
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};

use malachite_blocksync::{self as blocksync, OutboundRequestId};
use malachite_blocksync::{Request, SyncedBlock};
use malachite_common::{CertificateError, CommitCertificate, Context};
use tracing::{debug, error, warn};
use malachite_common::{CertificateError, CommitCertificate, Context, Height};

use crate::gossip_consensus::{GossipConsensusMsg, GossipConsensusRef, GossipEvent, Status};
use crate::host::{HostMsg, HostRef};
Expand Down Expand Up @@ -86,7 +86,7 @@ pub struct Params {
impl Default for Params {
fn default() -> Self {
Self {
status_update_interval: Duration::from_secs(10),
status_update_interval: Duration::from_secs(5),
request_timeout: Duration::from_secs(10),
}
}
Expand Down Expand Up @@ -179,11 +179,11 @@ where
use blocksync::Effect;

match effect {
Effect::PublishStatus(height) => {
Effect::BroadcastStatus(height) => {
let earliest_block_height = self.get_earliest_block_height().await?;

self.gossip
.cast(GossipConsensusMsg::PublishStatus(Status::new(
.cast(GossipConsensusMsg::BroadcastStatus(Status::new(
height,
earliest_block_height,
)))?;
Expand Down Expand Up @@ -283,11 +283,16 @@ where
}

Msg::Decided(height) => {
self.process_input(&myself, state, blocksync::Input::Decided(height))
self.process_input(&myself, state, blocksync::Input::UpdateHeight(height))
.await?;
}

Msg::StartHeight(height) => {
if let Some(height) = height.decrement() {
self.process_input(&myself, state, blocksync::Input::UpdateHeight(height))
.await?;
}

self.process_input(&myself, state, blocksync::Input::StartHeight(height))
.await?;
}
Expand Down
8 changes: 4 additions & 4 deletions code/crates/actors/src/gossip_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ pub enum Msg<Ctx: Context> {
/// Publish a proposal part
PublishProposalPart(StreamMessage<Ctx::ProposalPart>),

/// Publish status
PublishStatus(Status<Ctx>),
/// Broadcast status to all direct peers
BroadcastStatus(Status<Ctx>),

/// Send a request to a peer, returning the outbound request ID
OutgoingBlockSyncRequest(PeerId, Request<Ctx>, RpcReplyPort<OutboundRequestId>),
Expand Down Expand Up @@ -241,7 +241,7 @@ where
}
}

Msg::PublishStatus(status) => {
Msg::BroadcastStatus(status) => {
let status = blocksync::Status {
peer_id: ctrl_handle.peer_id(),
height: status.height,
Expand All @@ -250,7 +250,7 @@ where

let data = self.codec.encode(status);
match data {
Ok(data) => ctrl_handle.publish(Channel::BlockSync, data).await?,
Ok(data) => ctrl_handle.broadcast(Channel::BlockSync, data).await?,
Err(e) => error!("Failed to encode status message: {e:?}"),
}
}
Expand Down
22 changes: 12 additions & 10 deletions code/crates/blocksync/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl<Ctx: Context> Default for Resume<Ctx> {

#[derive_where(Debug)]
pub enum Effect<Ctx: Context> {
/// Publish our status to the network
PublishStatus(Ctx::Height),
/// Broadcast our status to our direct peers
BroadcastStatus(Ctx::Height),

/// Send a BlockSync request to a peer
SendRequest(PeerId, Request<Ctx>),
Expand All @@ -58,7 +58,7 @@ pub enum Input<Ctx: Context> {
StartHeight(Ctx::Height),

/// Consensus just decided on a new block
Decided(Ctx::Height),
UpdateHeight(Ctx::Height),

/// A BlockSync request has been received from a peer
Request(InboundRequestId, PeerId, Request<Ctx>),
Expand Down Expand Up @@ -89,7 +89,7 @@ where
Input::Tick => on_tick(co, state, metrics).await,
Input::Status(status) => on_status(co, state, metrics, status).await,
Input::StartHeight(height) => on_start_height(co, state, metrics, height).await,
Input::Decided(height) => on_decided(co, state, metrics, height).await,
Input::UpdateHeight(height) => on_update_height(co, state, metrics, height).await,
Input::Request(request_id, peer_id, request) => {
on_request(co, state, metrics, request_id, peer_id, request).await
}
Expand Down Expand Up @@ -117,9 +117,9 @@ pub async fn on_tick<Ctx>(
where
Ctx: Context,
{
debug!(height = %state.tip_height, "Publishing status");
debug!(height = %state.tip_height, "Broadcasting status");

perform!(co, Effect::PublishStatus(state.tip_height));
perform!(co, Effect::BroadcastStatus(state.tip_height));

Ok(())
}
Expand Down Expand Up @@ -222,7 +222,7 @@ where
Ok(())
}

pub async fn on_decided<Ctx>(
pub async fn on_update_height<Ctx>(
_co: Co<Ctx>,
state: &mut State<Ctx>,
_metrics: &Metrics,
Expand All @@ -231,10 +231,12 @@ pub async fn on_decided<Ctx>(
where
Ctx: Context,
{
debug!(%height, "Decided on a block");
if state.tip_height < height {
debug!(%height, "Update height");

state.tip_height = height;
state.remove_pending_request(height);
state.tip_height = height;
state.remove_pending_request(height);
}

Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions code/crates/common/src/height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@ where
self.increment_by(1)
}

/// Decrement the height by one.
fn decrement(&self) -> Option<Self> {
self.decrement_by(1)
}

/// Increment this height by the given amount.
fn increment_by(&self, n: u64) -> Self;

/// Decrement this height by the given amount.
/// Returns None if the height would be decremented below its minimum.
fn decrement_by(&self, n: u64) -> Option<Self>;

/// Convert the height to a `u64`.
fn as_u64(&self) -> u64;
}
53 changes: 19 additions & 34 deletions code/crates/gossip-consensus/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use either::Either;
use libp2p::request_response::{OutboundRequestId, ResponseChannel};
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::NetworkBehaviour;
Expand All @@ -14,7 +13,7 @@ use malachite_blocksync as blocksync;
use malachite_discovery as discovery;
use malachite_metrics::Registry;

use crate::{Config, GossipSubConfig, PubSubProtocol, PROTOCOL};
use crate::{Config, GossipSubConfig, PROTOCOL};

#[derive(Debug)]
pub enum NetworkEvent {
Expand Down Expand Up @@ -62,25 +61,13 @@ impl From<discovery::Event> for NetworkEvent {
}
}

impl<A, B> From<Either<A, B>> for NetworkEvent
where
A: Into<NetworkEvent>,
B: Into<NetworkEvent>,
{
fn from(event: Either<A, B>) -> Self {
match event {
Either::Left(event) => event.into(),
Either::Right(event) => event.into(),
}
}
}

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NetworkEvent")]
pub struct Behaviour {
pub identify: identify::Behaviour,
pub ping: ping::Behaviour,
pub pubsub: Either<gossipsub::Behaviour, broadcast::Behaviour>,
pub gossipsub: gossipsub::Behaviour,
pub broadcast: broadcast::Behaviour,
pub blocksync: blocksync::Behaviour,
pub discovery: Toggle<discovery::Behaviour>,
}
Expand Down Expand Up @@ -140,23 +127,20 @@ impl Behaviour {

let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(5)));

let pubsub = match config.protocol {
PubSubProtocol::GossipSub(cfg) => Either::Left(
gossipsub::Behaviour::new_with_metrics(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config(cfg, config.pubsub_max_size),
registry.sub_registry_with_prefix("gossipsub"),
Default::default(),
)
.unwrap(),
),
PubSubProtocol::Broadcast => Either::Right(broadcast::Behaviour::new_with_metrics(
broadcast::Config {
max_buf_size: config.pubsub_max_size,
},
registry.sub_registry_with_prefix("broadcast"),
)),
};
let gossipsub = gossipsub::Behaviour::new_with_metrics(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config(config.gossipsub, config.pubsub_max_size),
registry.sub_registry_with_prefix("gossipsub"),
Default::default(),
)
.unwrap();

let broadcast = broadcast::Behaviour::new_with_metrics(
broadcast::Config {
max_buf_size: config.pubsub_max_size,
},
registry.sub_registry_with_prefix("broadcast"),
);

let blocksync = blocksync::Behaviour::new_with_metrics(
blocksync::Config::default().with_max_response_size(config.rpc_max_size),
Expand All @@ -168,7 +152,8 @@ impl Behaviour {
Self {
identify,
ping,
pubsub,
gossipsub,
broadcast,
blocksync,
discovery,
}
Expand Down
4 changes: 4 additions & 0 deletions code/crates/gossip-consensus/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl Channel {
]
}

pub fn consensus() -> &'static [Channel] {
&[Channel::Consensus, Channel::ProposalParts]
}

pub fn to_gossipsub_topic(self) -> gossipsub::IdentTopic {
gossipsub::IdentTopic::new(self.as_str())
}
Expand Down
5 changes: 5 additions & 0 deletions code/crates/gossip-consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ impl CtrlHandle {
Ok(())
}

pub async fn broadcast(&self, channel: Channel, data: Bytes) -> Result<(), eyre::Report> {
self.tx_ctrl.send(CtrlMsg::Broadcast(channel, data)).await?;
Ok(())
}

pub async fn blocksync_request(
&self,
peer_id: PeerId,
Expand Down
Loading

0 comments on commit 95ad838

Please sign in to comment.