diff --git a/CHANGELOG.md b/CHANGELOG.md index c88dec1fa2f..da8f8b61271 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [1983](https://github.com/FuelLabs/fuel-core/pull/1983): Add adapters for gas price service for accessing database values +- [2046](https://github.com/FuelLabs/fuel-core/pull/2046): Added limit for the number of connections per same remote IP. ### Breaking - [2025](https://github.com/FuelLabs/fuel-core/pull/2025): Add new V0 algorithm for gas price to services. diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index f8b5797c214..39cdd163720 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -90,6 +90,10 @@ pub struct P2PArgs { #[clap(long = "max-peers-connected", default_value = "50", env)] pub max_peers_connected: u32, + /// Max number of unique peers connected with the same remote IP address. + #[clap(long = "max-peers-per-remote-ip", default_value = "5", env)] + pub max_peers_per_remote_ip: usize, + /// Max number of connections per single peer /// The total number of connections will be `(max_peers_connected + reserved_nodes.len()) * max_connections_per_peer` #[clap(long = "max-connections-per-peer", default_value = "3", env)] @@ -309,6 +313,7 @@ impl P2PArgs { reserved_nodes_only_mode: self.reserved_nodes_only_mode, enable_mdns: self.enable_mdns, max_peers_connected: self.max_peers_connected, + max_peers_per_remote_ip: self.max_peers_per_remote_ip, max_connections_per_peer: self.max_connections_per_peer, allow_private_addresses: self.allow_private_addresses, random_walk, diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index a05f7c4a51d..f853c92f9d8 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -91,6 +91,8 @@ pub struct Config { /// This number should be at least number of `mesh_n` from `Gossipsub` configuration. /// The total number of connections will be `(max_peers_connected + reserved_nodes.len()) * max_connections_per_peer` pub max_peers_connected: u32, + /// Max number of unique peers connected with the same remote IP address. + pub max_peers_per_remote_ip: usize, /// Max number of connections per single peer /// The total number of connections will be `(max_peers_connected + reserved_nodes.len()) * max_connections_per_peer` pub max_connections_per_peer: u32, @@ -154,6 +156,7 @@ impl Config { bootstrap_nodes: self.bootstrap_nodes, enable_mdns: self.enable_mdns, max_peers_connected: self.max_peers_connected, + max_peers_per_remote_ip: self.max_peers_per_remote_ip, max_connections_per_peer: self.max_connections_per_peer, allow_private_addresses: self.allow_private_addresses, random_walk: self.random_walk, @@ -203,6 +206,7 @@ impl Config { bootstrap_nodes: vec![], enable_mdns: false, max_peers_connected: 50, + max_peers_per_remote_ip: 50, max_connections_per_peer: 3, allow_private_addresses: true, random_walk: Some(Duration::from_millis(500)), diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 41ef9fb8109..7ed718cb906 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -544,6 +544,9 @@ impl FuelP2PService { self.peer_manager.handle_peer_disconnect(peer_id); return Some(FuelP2PEvent::PeerDisconnected(peer_id)); } + PeerReportEvent::AskForDisconnection { peer_id } => { + let _ = self.swarm.disconnect_peer_id(peer_id); + } } None } @@ -748,6 +751,11 @@ mod tests { }; use futures::{ future::join_all, + stream::{ + select_all, + BoxStream, + }, + FutureExt, StreamExt, }; use libp2p::{ @@ -772,6 +780,8 @@ mod tests { mpsc, oneshot, watch, + Mutex, + OwnedMutexGuard, }; use tracing_attributes::instrument; @@ -1186,6 +1196,128 @@ mod tests { } } + fn any_event_from_node<'a>( + nodes: Vec, + ) -> BoxStream<'a, (OwnedMutexGuard, FuelP2PEvent)> { + use futures::stream; + + let streams = nodes.into_iter().map(|node| { + let node = Arc::new(Mutex::new(node)); + + stream::unfold(node, |node| { + async move { + let new_node = node.clone(); + let mut lock = node.lock_owned().await; + loop { + let event = lock.next_event().await; + if let Some(event) = event { + return Some(((lock, event), new_node)); + } + } + } + .boxed() + }) + }); + + select_all(streams).boxed() + } + + #[tokio::test] + #[instrument] + async fn limited_number_of_connections_per_remote_ip() { + const LIMIT: usize = 10; + + // Node A + let mut p2p_config = + Config::default_initialized("limited_number_of_connections_per_remote_ip"); + p2p_config.max_peers_per_remote_ip = LIMIT; + + let mut target_node = build_service_from_config(p2p_config.clone()).await; + + p2p_config.bootstrap_nodes = target_node.multiaddrs(); + + let mut nodes_that_fit_into_limit = vec![]; + for _ in 0..LIMIT { + let node = build_service_from_config(p2p_config.clone()).await; + nodes_that_fit_into_limit.push(node); + } + + let mut good_nodes = any_event_from_node(nodes_that_fit_into_limit); + let mut connected_nodes: usize = 0; + + loop { + tokio::select! { + target_node_event = target_node.next_event() => { + if let Some(event) = target_node_event { + tracing::info!("Target node Event: {:?}", event); + match event { + FuelP2PEvent::PeerConnected(_) => { + connected_nodes = connected_nodes.saturating_add(1); + } + _ => { + // Do nothing + } + } + + if connected_nodes >= LIMIT { + break + } + } + }, + event = good_nodes.next() => { + if let Some((good_node, event)) = event { + tracing::info!("Good node {:?}: {:?}", good_node.local_peer_id, event); + + match event { + FuelP2PEvent::PeerDisconnected(_) => { + panic!("Good node should not disconnect"); + } + _ => { + // Do nothing + } + } + } + }, + } + } + + let mut nodes_that_not_fit_into_limit = vec![]; + for _ in 0..LIMIT { + let node = build_service_from_config(p2p_config.clone()).await; + nodes_that_not_fit_into_limit.push(node); + } + let new_nodes = any_event_from_node(nodes_that_not_fit_into_limit); + let mut nodes = select_all(vec![good_nodes, new_nodes]); + let mut rejected_nodes = HashSet::new(); + + loop { + tokio::select! { + target_node_event = target_node.next_event() => { + tracing::info!("Target node Event: {:?}", target_node_event); + assert!(target_node.peer_manager.get_peers_ids().count() <= LIMIT); + }, + node_event = nodes.next() => { + if let Some((node, event)) = node_event { + tracing::info!("Node {:?}: {:?}", node.local_peer_id, event); + + match event { + FuelP2PEvent::PeerDisconnected(_) => { + rejected_nodes.insert(node.local_peer_id); + } + _ => { + // Do nothing + } + } + + if rejected_nodes.len() == LIMIT { + break + } + } + }, + } + } + } + // Simulates 2 p2p nodes that connect to each other and consequently exchange Peer Info // On successful connection, node B updates its latest BlockHeight // and shares it with Peer A via Heartbeat protocol diff --git a/crates/services/p2p/src/peer_report.rs b/crates/services/p2p/src/peer_report.rs index 2c4579d7e2b..dde7367a3c7 100644 --- a/crates/services/p2p/src/peer_report.rs +++ b/crates/services/p2p/src/peer_report.rs @@ -1,10 +1,17 @@ use crate::{ config::Config, + Protocol::{ + Ip4, + Ip6, + }, TryPeerId, }; use libp2p::{ self, - core::Endpoint, + core::{ + ConnectedPoint, + Endpoint, + }, swarm::{ derive_prelude::{ ConnectionClosed, @@ -29,10 +36,13 @@ use libp2p::{ }; use std::{ collections::{ + hash_map::Entry, BTreeMap, + HashMap, HashSet, VecDeque, }, + net::IpAddr, task::{ Context, Poll, @@ -60,6 +70,9 @@ pub enum PeerReportEvent { PeerDisconnected { peer_id: PeerId, }, + AskForDisconnection { + peer_id: PeerId, + }, /// Informs p2p service / PeerManager to perform reputation decay of connected nodes PerformDecay, } @@ -71,15 +84,17 @@ pub struct Behaviour { connected_reserved_nodes: HashSet, pending_connections: HashSet, pending_events: VecDeque>, + ips_to_peers: HashMap>, + max_connections_per_ip: usize, decay_interval: Interval, } impl Behaviour { - pub(crate) fn new(_config: &Config) -> Self { + pub(crate) fn new(config: &Config) -> Self { let mut reserved_nodes_to_connect = VecDeque::new(); let mut reserved_nodes_multiaddr = BTreeMap::>::new(); - for multiaddr in &_config.reserved_nodes { + for multiaddr in &config.reserved_nodes { let peer_id = multiaddr.try_to_peer_id().unwrap(); reserved_nodes_to_connect.push_back((Instant::now(), peer_id)); reserved_nodes_multiaddr @@ -94,6 +109,8 @@ impl Behaviour { connected_reserved_nodes: Default::default(), pending_connections: Default::default(), pending_events: VecDeque::default(), + ips_to_peers: Default::default(), + max_connections_per_ip: config.max_peers_per_remote_ip, decay_interval: time::interval(Duration::from_secs( REPUTATION_DECAY_INTERVAL_IN_SECONDS, )), @@ -101,6 +118,14 @@ impl Behaviour { } } +fn get_ip_addr(addr: &Multiaddr) -> Option { + addr.iter().find_map(|p| match p { + Ip4(addr) => Some(IpAddr::V4(addr)), + Ip6(addr) => Some(IpAddr::V6(addr)), + _ => None, + }) +} + impl NetworkBehaviour for Behaviour { type ConnectionHandler = dummy::ConnectionHandler; type ToSwarm = PeerReportEvent; @@ -108,10 +133,31 @@ impl NetworkBehaviour for Behaviour { fn handle_established_inbound_connection( &mut self, _connection_id: ConnectionId, - _peer: PeerId, + peer: PeerId, _local_addr: &Multiaddr, - _remote_addr: &Multiaddr, + remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { + if let Some(ip) = get_ip_addr(remote_addr) { + let entry = self.ips_to_peers.entry(ip); + + match entry { + Entry::Occupied(mut occupied) => { + let set = occupied.get_mut(); + + if set.len() >= self.max_connections_per_ip { + return Err(ConnectionDenied::new( + "Max connections per IP reached", + )) + } + + set.insert(peer); + } + Entry::Vacant(_) => { + // Do nothing + } + } + } + Ok(dummy::ConnectionHandler) } @@ -131,6 +177,7 @@ impl NetworkBehaviour for Behaviour { let ConnectionEstablished { peer_id, connection_id, + endpoint, .. } = connection_established; self.pending_events.push_back(ToSwarm::GenerateEvent( @@ -140,11 +187,30 @@ impl NetworkBehaviour for Behaviour { self.connected_reserved_nodes.insert(peer_id); self.pending_connections.remove(&connection_id); } + + match endpoint { + ConnectedPoint::Dialer { .. } => {} + ConnectedPoint::Listener { send_back_addr, .. } => { + let ip = get_ip_addr(send_back_addr); + + if let Some(ip) = ip { + let set = self.ips_to_peers.entry(ip).or_default(); + set.insert(peer_id); + + if set.len() > self.max_connections_per_ip { + self.pending_events.push_back(ToSwarm::GenerateEvent( + PeerReportEvent::AskForDisconnection { peer_id }, + )); + } + } + } + } } FromSwarm::ConnectionClosed(connection_closed) => { let ConnectionClosed { remaining_established, peer_id, + endpoint, .. } = connection_closed; @@ -160,6 +226,35 @@ impl NetworkBehaviour for Behaviour { .push_back((Instant::now(), peer_id)); } } + + match endpoint { + ConnectedPoint::Dialer { .. } => {} + ConnectedPoint::Listener { send_back_addr, .. } => { + let ip = get_ip_addr(send_back_addr); + + if let Some(ip) = ip { + let entry = self.ips_to_peers.entry(ip); + + match entry { + Entry::Occupied(mut occupied) => { + let set = occupied.get_mut(); + set.remove(&peer_id); + + if set.is_empty() { + occupied.remove(); + } + } + Entry::Vacant(_) => { + tracing::warn!( + "IP address not found in the \ + map during disconnection of the peer {:?}", + peer_id + ) + } + } + } + } + } } FromSwarm::DialFailure(dial) => { tracing::error!(