Skip to content

Commit

Permalink
fix(relay_manager): filter out bad nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed May 8, 2024
1 parent 38ed230 commit 555b020
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
10 changes: 6 additions & 4 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub(crate) type PendingGetRecord = HashMap<
),
>;

/// 10 is the max number of issues per node we track to avoid mem leaks
/// The boolean flag to indicate whether the node is considered as bad or not
pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;

/// What is the largest packet to send over the network.
/// Records larger than this will be rejected.
// TODO: revisit once cashnote_redemption is in
Expand Down Expand Up @@ -655,9 +659,7 @@ pub struct SwarmDriver {
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
pub(crate) hard_disk_write_error: usize,
// 10 is the max number of issues per node we track to avoid mem leaks
// the boolean flag to indicate whether the node is considered as bad or not
pub(crate) bad_nodes: BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>,
pub(crate) bad_nodes: BadNodes,
pub(crate) bad_nodes_ongoing_verifications: BTreeSet<PeerId>,
pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
}
Expand Down Expand Up @@ -715,7 +717,7 @@ impl SwarmDriver {
}
}
}
_ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm),
_ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion sn_networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl SwarmDriver {
&peer_id,
&addrs,
&info.protocols,
&self.bad_nodes,
);

// When received an identify from un-dialed peer, try to dial it
Expand Down Expand Up @@ -603,7 +604,7 @@ impl SwarmDriver {
}

// skip if the peer is a relay server that we're connected to
if self.relay_manager.keep_alive_peer(peer_id) {
if self.relay_manager.keep_alive_peer(peer_id, &self.bad_nodes) {
continue;
}

Expand Down
49 changes: 41 additions & 8 deletions sn_networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::driver::NodeBehaviour;
use crate::driver::{BadNodes, NodeBehaviour};
use libp2p::{
core::transport::ListenerId, multiaddr::Protocol, Multiaddr, PeerId, StreamProtocol, Swarm,
};
Expand Down Expand Up @@ -64,9 +64,19 @@ impl RelayManager {
}

/// Should we keep this peer alive?
pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId) -> bool {
self.connected_relays.contains_key(peer_id)
|| self.waiting_for_reservation.contains_key(peer_id)
/// If a peer is considered as a bad node, closing it's connection would remove that server from the listen addr.
#[allow(clippy::nonminimal_bool)]
pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId, bad_nodes: &BadNodes) -> bool {
let is_not_bad = if let Some((_, is_bad)) = bad_nodes.get(peer_id) {
!*is_bad
} else {
true
};

// we disconnect from bad server
(self.connected_relays.contains_key(peer_id) && is_not_bad)
|| (self.waiting_for_reservation.contains_key(peer_id) && is_not_bad)
// but servers provide connections to bad nodes.
|| self.reserved_by.contains(peer_id)
}

Expand All @@ -77,12 +87,20 @@ impl RelayManager {
peer_id: &PeerId,
addrs: &HashSet<Multiaddr>,
stream_protocols: &Vec<StreamProtocol>,
bad_nodes: &BadNodes,
) {
if self.candidates.len() >= MAX_POTENTIAL_CANDIDATES {
trace!("Got max relay candidates");
return;
}

if let Some((_, is_bad)) = bad_nodes.get(peer_id) {
if *is_bad {
debug!("Not adding peer {peer_id:?} as relay candidate as it is a bad node.");
return;
}
}

if Self::does_it_support_relay_server_protocol(stream_protocols) {
// todo: collect and manage multiple addrs
if let Some(addr) = addrs.iter().next() {
Expand All @@ -106,7 +124,11 @@ impl RelayManager {
// todo: how do we know if a reservation has been revoked / if the peer has gone offline?
/// Try connecting to candidate relays if we are below the threshold connections.
/// This is run periodically on a loop.
pub(crate) fn try_connecting_to_relay(&mut self, swarm: &mut Swarm<NodeBehaviour>) {
pub(crate) fn try_connecting_to_relay(
&mut self,
swarm: &mut Swarm<NodeBehaviour>,
bad_nodes: &BadNodes,
) {
if !self.enable_client {
return;
}
Expand All @@ -133,6 +155,14 @@ impl RelayManager {
};

if let Some((peer_id, relay_addr)) = self.candidates.remove(index) {
// skip if detected as a bad node
if let Some((_, is_bad)) = bad_nodes.get(&peer_id) {
if *is_bad {
trace!("Peer {peer_id:?} is considered as a bad node. Skipping it.");
continue;
}
}

if self.connected_relays.contains_key(&peer_id)
|| self.waiting_for_reservation.contains_key(&peer_id)
{
Expand Down Expand Up @@ -209,10 +239,13 @@ impl RelayManager {
};
info!("Removing external addr: {addr_with_self_peer_id:?}");
swarm.remove_external_address(&addr_with_self_peer_id);
} else if let Some(addr) = self.waiting_for_reservation.remove(&peer_id) {
}
if let Some(addr) = self.waiting_for_reservation.remove(&peer_id) {
info!("Removed peer form waiting_for_reservation as the listener has been closed {peer_id:?}: {addr:?}");
} else {
warn!("Could not find the listen addr after making reservation to the same");
trace!(
"waiting_for_reservation len: {:?}",
self.waiting_for_reservation.len()
)
}
}

Expand Down

0 comments on commit 555b020

Please sign in to comment.