Skip to content

Commit

Permalink
Broadcast BlockSync status instead of gossiping it
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Nov 25, 2024
1 parent d10ca2d commit b95a7d1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
2 changes: 1 addition & 1 deletion code/crates/actors/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ where
let earliest_block_height = self.get_earliest_block_height().await?;

self.gossip
.cast(GossipConsensusMsg::PublishStatus(Status::new(
.cast(GossipConsensusMsg::BroadcastStatus(Status::new(
height,
earliest_block_height,
)))?;
Expand Down
6 changes: 3 additions & 3 deletions code/crates/actors/src/gossip_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub enum Msg<Ctx: Context> {
PublishProposalPart(StreamMessage<Ctx::ProposalPart>),

/// Publish status
PublishStatus(Status<Ctx>),
BroadcastStatus(Status<Ctx>),

/// Send a request to a peer, returning the outbound request ID
OutgoingBlockSyncRequest(PeerId, Request<Ctx>, RpcReplyPort<OutboundRequestId>),
Expand Down Expand Up @@ -241,7 +241,7 @@ where
}
}

Msg::PublishStatus(status) => {
Msg::BroadcastStatus(status) => {
let status = blocksync::Status {
peer_id: ctrl_handle.peer_id(),
height: status.height,
Expand All @@ -250,7 +250,7 @@ where

let data = self.codec.encode(status);
match data {
Ok(data) => ctrl_handle.publish(Channel::BlockSync, data).await?,
Ok(data) => ctrl_handle.broadcast(Channel::BlockSync, data).await?,
Err(e) => error!("Failed to encode status message: {e:?}"),
}
}
Expand Down
5 changes: 5 additions & 0 deletions code/crates/gossip-consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ impl CtrlHandle {
Ok(())
}

pub async fn broadcast(&self, channel: Channel, data: Bytes) -> Result<(), eyre::Report> {
self.tx_ctrl.send(CtrlMsg::Broadcast(channel, data)).await?;
Ok(())
}

pub async fn blocksync_request(
&self,
peer_id: PeerId,
Expand Down
19 changes: 17 additions & 2 deletions code/crates/gossip-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,20 @@ pub enum TransportProtocol {
#[derive(Clone, Debug)]
pub enum Event {
Listening(Multiaddr),
Message(Channel, PeerId, Bytes),
BlockSync(blocksync::RawMessage),
PeerConnected(PeerId),
PeerDisconnected(PeerId),
Message(Channel, PeerId, Bytes),
BlockSync(blocksync::RawMessage),
}

#[derive(Debug)]
pub enum CtrlMsg {
Publish(Channel, Bytes),
Broadcast(Channel, Bytes),

BlockSyncRequest(PeerId, Bytes, oneshot::Sender<OutboundRequestId>),
BlockSyncReply(InboundRequestId, Bytes),

Shutdown,
}

Expand Down Expand Up @@ -270,6 +273,18 @@ async fn handle_ctrl_msg(

match result {
Ok(()) => debug!(%channel, size = %msg_size, "Published message"),
Err(e) => error!(%channel, "Error publishing message: {e}"),
}

ControlFlow::Continue(())
}

CtrlMsg::Broadcast(channel, data) => {
let msg_size = data.len();
let result = pubsub::publish(swarm, PubSubProtocol::Broadcast, channel, data);

match result {
Ok(()) => debug!(%channel, size = %msg_size, "Broadcasted message"),
Err(e) => error!(%channel, "Error broadcasting message: {e}"),
}

Expand Down

0 comments on commit b95a7d1

Please sign in to comment.