From 0a4c7ef39d357631d2db5c3393ffa99342aae3ce Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 19 Oct 2023 15:32:59 +0530 Subject: [PATCH] fix(network): emit NetworkEvent when we publish a gossipsub msg - should fix the reward test failures - they were cuased because the `genesis_node` that we used to monitor the gossipsub messages from was the one who published the message - Hence it would not emit and `NodeEvent` that we were listening for --- sn_client/src/api.rs | 3 ++- sn_networking/src/cmd.rs | 6 ++++++ sn_networking/src/event.rs | 18 ++++++++++++++---- sn_node/src/api.rs | 6 ++++-- sn_node/src/put_validation.rs | 1 + 5 files changed, 27 insertions(+), 7 deletions(-) diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 2aab38c70c..0f9f5cda84 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -218,7 +218,8 @@ impl Client { } } } - NetworkEvent::GossipsubMsg { topic, msg } => { + NetworkEvent::GossipsubMsgReceived { topic, msg } + | NetworkEvent::GossipsubMsgPublished { topic, msg } => { self.events_channel .broadcast(ClientEvent::GossipsubMsg { topic, msg })?; } diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 4f8d0eb31c..0c74788381 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -407,6 +407,12 @@ impl SwarmDriver { .unsubscribe(&topic_id)?; } SwarmCmd::GossipsubPublish { topic_id, msg } => { + // If we publish a Gossipsub message, we might not receive the same message on our side. + // Hence push an event to notify that we've published a message + self.send_event(NetworkEvent::GossipsubMsgPublished { + topic: topic_id.clone(), + msg: msg.clone(), + }); let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); self.swarm .behaviour_mut() diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index cd5d74795d..585df5ad16 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -138,12 +138,19 @@ pub enum NetworkEvent { /// Report failed write to cleanup record store FailedToWrite(RecordKey), /// Gossipsub message received - GossipsubMsg { + GossipsubMsgReceived { /// Topic the message was published on topic: String, /// The raw bytes of the received message msg: Vec, }, + /// The Gossipsub message that we published + GossipsubMsgPublished { + /// Topic the message was published on + topic: String, + /// The raw bytes of the sent message + msg: Vec, + }, } // Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected. @@ -183,8 +190,11 @@ impl Debug for NetworkEvent { let pretty_key = PrettyPrintRecordKey::from(record_key.clone()); write!(f, "NetworkEvent::FailedToWrite({pretty_key:?})") } - NetworkEvent::GossipsubMsg { topic, .. } => { - write!(f, "NetworkEvent::GossipsubMsg({topic})") + NetworkEvent::GossipsubMsgReceived { topic, .. } => { + write!(f, "NetworkEvent::GossipsubMsgReceived({topic})") + } + NetworkEvent::GossipsubMsgPublished { topic, .. } => { + write!(f, "NetworkEvent::GossipsubMsgPublished({topic})") } } } @@ -326,7 +336,7 @@ impl SwarmDriver { libp2p::gossipsub::Event::Message { message, .. } => { let topic = message.topic.into_string(); let msg = message.data; - self.send_event(NetworkEvent::GossipsubMsg { topic, msg }); + self.send_event(NetworkEvent::GossipsubMsgReceived { topic, msg }); } other => trace!("Gossipsub Event has been ignored: {other:?}"), } diff --git a/sn_node/src/api.rs b/sn_node/src/api.rs index 30d1f9168e..af82ab4355 100644 --- a/sn_node/src/api.rs +++ b/sn_node/src/api.rs @@ -292,7 +292,8 @@ impl Node { | NetworkEvent::PeerRemoved(_) | NetworkEvent::NewListenAddr(_) | NetworkEvent::NatStatusChanged(_) - | NetworkEvent::GossipsubMsg { .. } => break, + | NetworkEvent::GossipsubMsgReceived { .. } + | NetworkEvent::GossipsubMsgPublished { .. } => break, } } trace!("Handling NetworkEvent {event:?}"); @@ -372,7 +373,8 @@ impl Node { error!("Failed to remove local record: {e:?}"); } } - NetworkEvent::GossipsubMsg { topic, msg } => { + NetworkEvent::GossipsubMsgReceived { topic, msg } + | NetworkEvent::GossipsubMsgPublished { topic, msg } => { if topic == TRANSFER_NOTIF_TOPIC { // this is expected to be a notification of a transfer which we treat specially match try_decode_transfer_notif(&msg) { diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index a84c22c364..d1497eedfa 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -453,6 +453,7 @@ impl Node { // publish a notification over gossipsub topic TRANSFER_NOTIF_TOPIC for each cash-note received. match transfer_for_us.to_hex() { Ok(transfer_hex) => { + trace!("Publishing a gossipsub msgs for transfer_for_us: {transfer_for_us:?}"); let topic = TRANSFER_NOTIF_TOPIC.to_string(); for cash_note in cash_notes.iter() { let pk = cash_note.unique_pubkey();