Skip to content

Commit

Permalink
fix(network): emit NetworkEvent when we publish a gossipsub msg
Browse files Browse the repository at this point in the history
- should fix the reward test failures
- they were cuased because the `genesis_node` that we used to monitor
  the gossipsub messages from was the one who published the message
- Hence it would not emit and `NodeEvent` that we were listening for
  • Loading branch information
RolandSherwin authored and joshuef committed Oct 19, 2023
1 parent d4663a0 commit 0a4c7ef
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 7 deletions.
3 changes: 2 additions & 1 deletion sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ impl Client {
}
}
}
NetworkEvent::GossipsubMsg { topic, msg } => {
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
self.events_channel
.broadcast(ClientEvent::GossipsubMsg { topic, msg })?;
}
Expand Down
6 changes: 6 additions & 0 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ impl SwarmDriver {
.unsubscribe(&topic_id)?;
}
SwarmCmd::GossipsubPublish { topic_id, msg } => {
// If we publish a Gossipsub message, we might not receive the same message on our side.
// Hence push an event to notify that we've published a message
self.send_event(NetworkEvent::GossipsubMsgPublished {
topic: topic_id.clone(),
msg: msg.clone(),
});
let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id);
self.swarm
.behaviour_mut()
Expand Down
18 changes: 14 additions & 4 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,19 @@ pub enum NetworkEvent {
/// Report failed write to cleanup record store
FailedToWrite(RecordKey),
/// Gossipsub message received
GossipsubMsg {
GossipsubMsgReceived {
/// Topic the message was published on
topic: String,
/// The raw bytes of the received message
msg: Vec<u8>,
},
/// The Gossipsub message that we published
GossipsubMsgPublished {
/// Topic the message was published on
topic: String,
/// The raw bytes of the sent message
msg: Vec<u8>,
},
}

// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected.
Expand Down Expand Up @@ -183,8 +190,11 @@ impl Debug for NetworkEvent {
let pretty_key = PrettyPrintRecordKey::from(record_key.clone());
write!(f, "NetworkEvent::FailedToWrite({pretty_key:?})")
}
NetworkEvent::GossipsubMsg { topic, .. } => {
write!(f, "NetworkEvent::GossipsubMsg({topic})")
NetworkEvent::GossipsubMsgReceived { topic, .. } => {
write!(f, "NetworkEvent::GossipsubMsgReceived({topic})")
}
NetworkEvent::GossipsubMsgPublished { topic, .. } => {
write!(f, "NetworkEvent::GossipsubMsgPublished({topic})")
}
}
}
Expand Down Expand Up @@ -326,7 +336,7 @@ impl SwarmDriver {
libp2p::gossipsub::Event::Message { message, .. } => {
let topic = message.topic.into_string();
let msg = message.data;
self.send_event(NetworkEvent::GossipsubMsg { topic, msg });
self.send_event(NetworkEvent::GossipsubMsgReceived { topic, msg });
}
other => trace!("Gossipsub Event has been ignored: {other:?}"),
}
Expand Down
6 changes: 4 additions & 2 deletions sn_node/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ impl Node {
| NetworkEvent::PeerRemoved(_)
| NetworkEvent::NewListenAddr(_)
| NetworkEvent::NatStatusChanged(_)
| NetworkEvent::GossipsubMsg { .. } => break,
| NetworkEvent::GossipsubMsgReceived { .. }
| NetworkEvent::GossipsubMsgPublished { .. } => break,
}
}
trace!("Handling NetworkEvent {event:?}");
Expand Down Expand Up @@ -372,7 +373,8 @@ impl Node {
error!("Failed to remove local record: {e:?}");
}
}
NetworkEvent::GossipsubMsg { topic, msg } => {
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
if topic == TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially
match try_decode_transfer_notif(&msg) {
Expand Down
1 change: 1 addition & 0 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ impl Node {
// publish a notification over gossipsub topic TRANSFER_NOTIF_TOPIC for each cash-note received.
match transfer_for_us.to_hex() {
Ok(transfer_hex) => {
trace!("Publishing a gossipsub msgs for transfer_for_us: {transfer_for_us:?}");
let topic = TRANSFER_NOTIF_TOPIC.to_string();
for cash_note in cash_notes.iter() {
let pk = cash_note.unique_pubkey();
Expand Down

0 comments on commit 0a4c7ef

Please sign in to comment.