diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 21f5c96923..c104ec14cb 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -16,7 +16,6 @@ use crate::{ cmd::SwarmCmd, error::{NetworkError, Result}, event::{NetworkEvent, NodeEvent}, - get_record_handler::PendingGetRecord, multiaddr_pop_p2p, network_discovery::NetworkDiscovery, record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig}, @@ -24,7 +23,7 @@ use crate::{ relay_manager::RelayManager, replication_fetcher::ReplicationFetcher, target_arch::{interval, spawn, Instant}, - Network, CLOSE_GROUP_SIZE, + GetRecordError, Network, CLOSE_GROUP_SIZE, }; use crate::{transport, NodeIssue}; use futures::future::Either; @@ -67,6 +66,7 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tracing::warn; +use xor_name::XorName; /// Interval over which we check for the farthest record we _should_ be holding /// based upon our knowledge of the CLOSE_GROUP @@ -85,6 +85,17 @@ pub(crate) enum PendingGetClosestType { } type PendingGetClosest = HashMap)>; +/// Using XorName to differentiate different record content under the same key. +type GetRecordResultMap = HashMap)>; +pub(crate) type PendingGetRecord = HashMap< + QueryId, + ( + oneshot::Sender>, + GetRecordResultMap, + GetRecordCfg, + ), +>; + /// What is the largest packet to send over the network. /// Records larger than this will be rejected. // TODO: revisit once cashnote_redemption is in diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs deleted file mode 100644 index 76f462051a..0000000000 --- a/sn_networking/src/event.rs +++ /dev/null @@ -1,1564 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use crate::{ - cmd::SwarmCmd, - driver::{PendingGetClosestType, SwarmDriver}, - error::{NetworkError, Result}, - multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, - target_arch::Instant, - CLOSE_GROUP_SIZE, REPLICATION_PEERS_COUNT, -}; -use core::fmt; -use custom_debug::Debug as CustomDebug; -use itertools::Itertools; -#[cfg(feature = "local-discovery")] -use libp2p::mdns; -use libp2p::{ - kad::{self, GetClosestPeersError, InboundRequest, QueryResult, Record, RecordKey, K_VALUE}, - multiaddr::Protocol, - request_response::{self, Message, ResponseChannel as PeerResponseChannel}, - swarm::{ - dial_opts::{DialOpts, PeerCondition}, - DialError, SwarmEvent, - }, - Multiaddr, PeerId, TransportError, -}; -use rand::{rngs::OsRng, Rng}; -use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}; -use sn_protocol::{ - get_port_from_multiaddr, - messages::{CmdResponse, Query, Request, Response}, - storage::RecordType, - NetworkAddress, PrettyPrintRecordKey, -}; -use sn_transfers::PaymentQuote; -use std::{ - collections::{hash_map::Entry, BTreeSet, HashSet}, - fmt::{Debug, Formatter}, -}; -use tokio::sync::oneshot; -use tokio::time::Duration; -use tracing::{info, warn}; - -/// NodeEvent enum -#[derive(CustomDebug)] -pub(super) enum NodeEvent { - MsgReceived(request_response::Event), - Kademlia(kad::Event), - #[cfg(feature = "local-discovery")] - Mdns(Box), - Identify(Box), - Dcutr(Box), - RelayClient(Box), - RelayServer(Box), -} - -impl From> for NodeEvent { - fn from(event: request_response::Event) -> Self { - NodeEvent::MsgReceived(event) - } -} - -impl From for NodeEvent { - fn from(event: kad::Event) -> Self { - NodeEvent::Kademlia(event) - } -} - -#[cfg(feature = "local-discovery")] -impl From for NodeEvent { - fn from(event: mdns::Event) -> Self { - NodeEvent::Mdns(Box::new(event)) - } -} - -impl From for NodeEvent { - fn from(event: libp2p::identify::Event) -> Self { - NodeEvent::Identify(Box::new(event)) - } -} -impl From for NodeEvent { - fn from(event: libp2p::dcutr::Event) -> Self { - NodeEvent::Dcutr(Box::new(event)) - } -} -impl From for NodeEvent { - fn from(event: libp2p::relay::client::Event) -> Self { - NodeEvent::RelayClient(Box::new(event)) - } -} -impl From for NodeEvent { - fn from(event: libp2p::relay::Event) -> Self { - NodeEvent::RelayServer(Box::new(event)) - } -} - -#[derive(CustomDebug)] -/// Channel to send the `Response` through. -pub enum MsgResponder { - /// Respond to a request from `self` through a simple one-shot channel. - FromSelf(Option>>), - /// Respond to a request from a peer in the network. - FromPeer(PeerResponseChannel), -} - -#[allow(clippy::large_enum_variant)] -/// Events forwarded by the underlying Network; to be used by the upper layers -pub enum NetworkEvent { - /// Incoming `Query` from a peer - QueryRequestReceived { - /// Query - query: Query, - /// The channel to send the `Response` through - channel: MsgResponder, - }, - /// Handles the responses that are not awaited at the call site - ResponseReceived { - /// Response - res: Response, - }, - /// Peer has been added to the Routing Table. And the number of connected peers. - PeerAdded(PeerId, usize), - /// Peer has been removed from the Routing Table. And the number of connected peers. - PeerRemoved(PeerId, usize), - /// The peer does not support our protocol - PeerWithUnsupportedProtocol { - our_protocol: String, - their_protocol: String, - }, - /// The peer is now considered as a bad node, due to the detected bad behaviour - PeerConsideredAsBad { - detected_by: PeerId, - bad_peer: PeerId, - bad_behaviour: String, - }, - /// The records bearing these keys are to be fetched from the holder or the network - KeysToFetchForReplication(Vec<(PeerId, RecordKey)>), - /// Started listening on a new address - NewListenAddr(Multiaddr), - /// Report unverified record - UnverifiedRecord(Record), - /// Terminate Node on unrecoverable errors - TerminateNode { reason: TerminateNodeReason }, - /// List of peer nodes that failed to fetch replication copy from. - FailedToFetchHolders(BTreeSet), - /// A peer in RT that supposed to be verified. - BadNodeVerification { peer_id: PeerId }, - /// Quotes to be verified - QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, - /// Carry out chunk proof check against the specified record and peer - ChunkProofVerification { - peer_id: PeerId, - keys_to_verify: Vec, - }, -} - -/// Terminate node for the following reason -#[derive(Debug, Clone)] -pub enum TerminateNodeReason { - HardDiskWriteError, -} - -// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected. -impl Debug for NetworkEvent { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - NetworkEvent::QueryRequestReceived { query, .. } => { - write!(f, "NetworkEvent::QueryRequestReceived({query:?})") - } - NetworkEvent::ResponseReceived { res, .. } => { - write!(f, "NetworkEvent::ResponseReceived({res:?})") - } - NetworkEvent::PeerAdded(peer_id, connected_peers) => { - write!(f, "NetworkEvent::PeerAdded({peer_id:?}, {connected_peers})") - } - NetworkEvent::PeerRemoved(peer_id, connected_peers) => { - write!( - f, - "NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})" - ) - } - NetworkEvent::PeerWithUnsupportedProtocol { - our_protocol, - their_protocol, - } => { - write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})") - } - NetworkEvent::PeerConsideredAsBad { - bad_peer, - bad_behaviour, - .. - } => { - write!( - f, - "NetworkEvent::PeerConsideredAsBad({bad_peer:?}, {bad_behaviour:?})" - ) - } - NetworkEvent::KeysToFetchForReplication(list) => { - let keys_len = list.len(); - write!(f, "NetworkEvent::KeysForReplication({keys_len:?})") - } - NetworkEvent::NewListenAddr(addr) => { - write!(f, "NetworkEvent::NewListenAddr({addr:?})") - } - NetworkEvent::UnverifiedRecord(record) => { - let pretty_key = PrettyPrintRecordKey::from(&record.key); - write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})") - } - NetworkEvent::TerminateNode { reason } => { - write!(f, "NetworkEvent::TerminateNode({reason:?})") - } - NetworkEvent::FailedToFetchHolders(bad_nodes) => { - write!(f, "NetworkEvent::FailedToFetchHolders({bad_nodes:?})") - } - NetworkEvent::BadNodeVerification { peer_id } => { - write!(f, "NetworkEvent::BadNodeVerification({peer_id:?})") - } - NetworkEvent::QuoteVerification { quotes } => { - write!( - f, - "NetworkEvent::QuoteVerification({} quotes)", - quotes.len() - ) - } - NetworkEvent::ChunkProofVerification { - peer_id, - keys_to_verify, - } => { - write!( - f, - "NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})" - ) - } - } - } -} - -impl SwarmDriver { - /// Handle `SwarmEvents` - pub(super) fn handle_swarm_events(&mut self, event: SwarmEvent) -> Result<()> { - let start = Instant::now(); - let event_string; - match event { - SwarmEvent::Behaviour(NodeEvent::MsgReceived(event)) => { - event_string = "msg_received"; - if let Err(e) = self.handle_msg(event) { - warn!("MsgReceivedError: {e:?}"); - } - } - SwarmEvent::Behaviour(NodeEvent::Kademlia(kad_event)) => { - event_string = "kad_event"; - self.handle_kad_event(kad_event)?; - } - SwarmEvent::Behaviour(NodeEvent::Dcutr(event)) => { - event_string = "dcutr_event"; - info!( - "Dcutr with remote peer: {:?} is: {:?}", - event.remote_peer_id, event.result - ); - } - SwarmEvent::Behaviour(NodeEvent::RelayClient(event)) => { - event_string = "relay_client_event"; - - info!(?event, "relay client event"); - - if let libp2p::relay::client::Event::ReservationReqAccepted { - relay_peer_id, .. - } = *event - { - self.relay_manager - .on_successful_reservation_by_client(&relay_peer_id, &mut self.swarm); - } - } - - SwarmEvent::Behaviour(NodeEvent::RelayServer(event)) => { - event_string = "relay_server_event"; - - info!(?event, "relay server event"); - - match *event { - libp2p::relay::Event::ReservationReqAccepted { - src_peer_id, - renewed: _, - } => { - self.relay_manager - .on_successful_reservation_by_server(src_peer_id); - } - libp2p::relay::Event::ReservationTimedOut { src_peer_id } => { - self.relay_manager.on_reservation_timeout(src_peer_id); - } - _ => {} - } - } - SwarmEvent::Behaviour(NodeEvent::Identify(iden)) => { - event_string = "identify"; - - match *iden { - libp2p::identify::Event::Received { peer_id, info } => { - trace!(%peer_id, ?info, "identify: received info"); - - if info.protocol_version != IDENTIFY_PROTOCOL_STR.to_string() { - warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {:?}", IDENTIFY_PROTOCOL_STR.as_str()); - - self.send_event(NetworkEvent::PeerWithUnsupportedProtocol { - our_protocol: IDENTIFY_PROTOCOL_STR.to_string(), - their_protocol: info.protocol_version, - }); - - return Ok(()); - } - - // if client, return. - if info.agent_version != IDENTIFY_NODE_VERSION_STR.to_string() { - return Ok(()); - } - - let has_dialed = self.dialed_peers.contains(&peer_id); - - // If we're not in local mode, only add globally reachable addresses. - // Strip the `/p2p/...` part of the multiaddresses. - // Collect into a HashSet directly to avoid multiple allocations and handle deduplication. - let addrs: HashSet = match self.local { - true => info - .listen_addrs - .into_iter() - .map(|addr| multiaddr_strip_p2p(&addr)) - .collect(), - false => info - .listen_addrs - .into_iter() - .filter(multiaddr_is_global) - .map(|addr| multiaddr_strip_p2p(&addr)) - .collect(), - }; - - self.relay_manager.add_potential_candidates( - &peer_id, - &addrs, - &info.protocols, - ); - - // When received an identify from un-dialed peer, try to dial it - // The dial shall trigger the same identify to be sent again and confirm - // peer is external accessible, hence safe to be added into RT. - if !self.local && !has_dialed { - // Only need to dial back for not fulfilled kbucket - let (kbucket_full, already_present_in_rt, ilog2) = - if let Some(kbucket) = - self.swarm.behaviour_mut().kademlia.kbucket(peer_id) - { - let ilog2 = kbucket.range().0.ilog2(); - let num_peers = kbucket.num_entries(); - let mut is_bucket_full = num_peers >= K_VALUE.into(); - - // check if peer_id is already a part of RT - let already_present_in_rt = kbucket - .iter() - .any(|entry| entry.node.key.preimage() == &peer_id); - - // If the bucket contains any of a bootstrap node, - // consider the bucket is not full and dial back - // so that the bootstrap nodes can be replaced. - if is_bucket_full { - if let Some(peers) = self.bootstrap_peers.get(&ilog2) { - if kbucket.iter().any(|entry| { - peers.contains(entry.node.key.preimage()) - }) { - is_bucket_full = false; - } - } - } - - (is_bucket_full, already_present_in_rt, ilog2) - } else { - return Ok(()); - }; - - if kbucket_full { - trace!("received identify for a full bucket {ilog2:?}, not dialing {peer_id:?} on {addrs:?}"); - return Ok(()); - } else if already_present_in_rt { - trace!("received identify for {peer_id:?} that is already part of the RT. Not dialing {peer_id:?} on {addrs:?}"); - return Ok(()); - } - - info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible"); - if let Err(err) = self.swarm.dial( - DialOpts::peer_id(peer_id) - .condition(PeerCondition::NotDialing) - .addresses(addrs.iter().cloned().collect()) - .build(), - ) { - warn!(%peer_id, ?addrs, "dialing error: {err:?}"); - } - - trace!( - "SwarmEvent handled in {:?}: {event_string:?}", - start.elapsed() - ); - return Ok(()); - } - - // If we are not local, we care only for peers that we dialed and thus are reachable. - if self.local || has_dialed { - // To reduce the bad_node check resource usage, - // during the connection establish process, only check cached black_list - // The periodical check, which involves network queries shall filter - // out bad_nodes eventually. - if let Some((_issues, true)) = self.bad_nodes.get(&peer_id) { - info!("Peer {peer_id:?} is considered as bad, blocking it."); - } else { - self.remove_bootstrap_from_full(peer_id); - - trace!(%peer_id, ?addrs, "identify: attempting to add addresses to routing table"); - - // Attempt to add the addresses to the routing table. - for multiaddr in addrs { - let _routing_update = self - .swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, multiaddr); - } - } - } - trace!( - "SwarmEvent handled in {:?}: {event_string:?}", - start.elapsed() - ); - } - // Log the other Identify events. - libp2p::identify::Event::Sent { .. } => trace!("identify: {iden:?}"), - libp2p::identify::Event::Pushed { .. } => trace!("identify: {iden:?}"), - libp2p::identify::Event::Error { .. } => trace!("identify: {iden:?}"), - } - } - #[cfg(feature = "local-discovery")] - SwarmEvent::Behaviour(NodeEvent::Mdns(mdns_event)) => { - event_string = "mdns"; - match *mdns_event { - mdns::Event::Discovered(list) => { - if self.local { - for (peer_id, addr) in list { - // The multiaddr does not contain the peer ID, so add it. - let addr = addr.with(Protocol::P2p(peer_id)); - - info!(%addr, "mDNS node discovered and dialing"); - - if let Err(err) = self.dial(addr.clone()) { - warn!(%addr, "mDNS node dial error: {err:?}"); - } - } - } - } - mdns::Event::Expired(peer) => { - trace!("mdns peer {peer:?} expired"); - } - } - } - - SwarmEvent::NewListenAddr { - address, - listener_id, - } => { - event_string = "new listen addr"; - - // update our stored port if it is configured to be 0 or None - match self.listen_port { - Some(0) | None => { - if let Some(actual_port) = get_port_from_multiaddr(&address) { - info!("Our listen port is configured as 0 or is not set. Setting it to our actual port: {actual_port}"); - self.listen_port = Some(actual_port); - } - } - _ => {} - }; - - let local_peer_id = *self.swarm.local_peer_id(); - let address = address.with(Protocol::P2p(local_peer_id)); - - // Trigger server mode if we're not a client and we should not add our own address if we're behind - // home network. - if !self.is_client && !self.is_behind_home_network { - if self.local { - // all addresses are effectively external here... - // this is needed for Kad Mode::Server - self.swarm.add_external_address(address.clone()); - } else { - // only add our global addresses - if multiaddr_is_global(&address) { - self.swarm.add_external_address(address.clone()); - } - } - } - - self.send_event(NetworkEvent::NewListenAddr(address.clone())); - - info!("Local node is listening {listener_id:?} on {address:?}"); - } - SwarmEvent::ListenerClosed { - listener_id, - addresses, - reason, - } => { - event_string = "listener closed"; - info!("Listener {listener_id:?} with add {addresses:?} has been closed for {reason:?}"); - self.relay_manager - .on_listener_closed(&listener_id, &mut self.swarm); - } - SwarmEvent::IncomingConnection { - connection_id, - local_addr, - send_back_addr, - } => { - event_string = "incoming"; - trace!("IncomingConnection ({connection_id:?}) with local_addr: {local_addr:?} send_back_addr: {send_back_addr:?}"); - } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - connection_id, - concurrent_dial_errors, - established_in, - } => { - event_string = "ConnectionEstablished"; - trace!(%peer_id, num_established, ?concurrent_dial_errors, "ConnectionEstablished ({connection_id:?}) in {established_in:?}: {}", endpoint_str(&endpoint)); - - let _ = self.live_connected_peers.insert( - connection_id, - (peer_id, Instant::now() + Duration::from_secs(60)), - ); - - if endpoint.is_dialer() { - self.dialed_peers.push(peer_id); - } - } - SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - cause, - num_established, - connection_id, - } => { - event_string = "ConnectionClosed"; - trace!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint)); - let _ = self.live_connected_peers.remove(&connection_id); - } - SwarmEvent::OutgoingConnectionError { - connection_id, - peer_id: None, - error, - } => { - event_string = "OutgoingConnErr"; - warn!("OutgoingConnectionError to on {connection_id:?} - {error:?}"); - } - SwarmEvent::OutgoingConnectionError { - peer_id: Some(failed_peer_id), - error, - connection_id, - } => { - event_string = "OutgoingConnErr"; - warn!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}"); - - // we need to decide if this was a critical error and the peer should be removed from the routing table - let should_clean_peer = match error { - DialError::Transport(errors) => { - // as it's an outgoing error, if it's transport based we can assume it is _our_ fault - // - // (eg, could not get a port for a tcp connection) - // so we default to it not being a real issue - // unless there are _specific_ errors (connection refused eg) - error!("Dial errors len : {:?}", errors.len()); - let mut there_is_a_serious_issue = false; - for (_addr, err) in errors { - error!("OutgoingTransport error : {err:?}"); - - match err { - TransportError::MultiaddrNotSupported(addr) => { - warn!("Multiaddr not supported : {addr:?}"); - // if we can't dial a peer on a given address, we should remove it from the routing table - there_is_a_serious_issue = true - } - TransportError::Other(err) => { - let problematic_errors = [ - "ConnectionRefused", - "HostUnreachable", - "HandshakeTimedOut", - ]; - - let is_bootstrap_peer = self - .bootstrap_peers - .iter() - .any(|(_ilog2, peers)| peers.contains(&failed_peer_id)); - - if is_bootstrap_peer - && self.connected_peers < self.bootstrap_peers.len() - { - warn!("OutgoingConnectionError: On bootstrap peer {failed_peer_id:?}, while still in bootstrap mode, ignoring"); - there_is_a_serious_issue = false; - } else { - // It is really difficult to match this error, due to being eg: - // Custom { kind: Other, error: Left(Left(Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })) } - // if we can match that, let's. But meanwhile we'll check the message - let error_msg = format!("{err:?}"); - if problematic_errors - .iter() - .any(|err| error_msg.contains(err)) - { - warn!("Problematic error encountered: {error_msg}"); - there_is_a_serious_issue = true; - } - } - } - } - } - there_is_a_serious_issue - } - DialError::NoAddresses => { - // We provided no address, and while we can't really blame the peer - // we also can't connect, so we opt to cleanup... - warn!("OutgoingConnectionError: No address provided"); - true - } - DialError::Aborted => { - // not their fault - warn!("OutgoingConnectionError: Aborted"); - false - } - DialError::DialPeerConditionFalse(_) => { - // we could not dial due to an internal condition, so not their issue - warn!("OutgoingConnectionError: DialPeerConditionFalse"); - false - } - DialError::LocalPeerId { endpoint, .. } => { - // This is actually _us_ So we should remove this from the RT - error!( - "OutgoingConnectionError: LocalPeerId: {}", - endpoint_str(&endpoint) - ); - true - } - DialError::WrongPeerId { obtained, endpoint } => { - // The peer id we attempted to dial was not the one we expected - // cleanup - error!("OutgoingConnectionError: WrongPeerId: obtained: {obtained:?}, endpoint: {endpoint:?}"); - true - } - DialError::Denied { cause } => { - // The peer denied our connection - // cleanup - error!("OutgoingConnectionError: Denied: {cause:?}"); - true - } - }; - - if should_clean_peer { - warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now"); - - if let Some(dead_peer) = self - .swarm - .behaviour_mut() - .kademlia - .remove_peer(&failed_peer_id) - { - self.connected_peers = self.connected_peers.saturating_sub(1); - - self.handle_cmd(SwarmCmd::RecordNodeIssue { - peer_id: failed_peer_id, - issue: crate::NodeIssue::ConnectionIssue, - })?; - - self.send_event(NetworkEvent::PeerRemoved( - *dead_peer.node.key.preimage(), - self.connected_peers, - )); - - self.log_kbuckets(&failed_peer_id); - let _ = self.check_for_change_in_our_close_group(); - } - } - } - SwarmEvent::IncomingConnectionError { - connection_id, - local_addr, - send_back_addr, - error, - } => { - event_string = "Incoming ConnErr"; - error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); - } - SwarmEvent::Dialing { - peer_id, - connection_id, - } => { - event_string = "Dialing"; - trace!("Dialing {peer_id:?} on {connection_id:?}"); - } - SwarmEvent::NewExternalAddrCandidate { address } => { - event_string = "NewExternalAddrCandidate"; - - if !self.swarm.external_addresses().any(|addr| addr == &address) - && !self.is_client - // If we are behind a home network, then our IP is returned here. We should be only having - // relay server as our external address - // todo: can our relay address be reported here? If so, maybe we should add them. - && !self.is_behind_home_network - { - debug!(%address, "external address: new candidate"); - - // Identify will let us know when we have a candidate. (Peers will tell us what address they see us as.) - // We manually confirm this to be our externally reachable address, though in theory it's possible we - // are not actually reachable. This event returns addresses with ports that were not set by the user, - // so we must not add those ports as they will not be forwarded. - // Setting this will also switch kad to server mode if it's not already in it. - if let Some(our_port) = self.listen_port { - if let Some(port) = get_port_from_multiaddr(&address) { - if port == our_port { - info!(%address, "external address: new candidate has the same configured port, adding it."); - self.swarm.add_external_address(address); - } else { - info!(%address, %our_port, "external address: new candidate has a different port, not adding it."); - } - } - } else { - trace!("external address: listen port not set. This has to be set if you're running a node"); - } - } - let all_external_addresses = self.swarm.external_addresses().collect_vec(); - let all_listeners = self.swarm.listeners().collect_vec(); - debug!("All our listeners: {all_listeners:?}"); - debug!("All our external addresses: {all_external_addresses:?}"); - } - SwarmEvent::ExternalAddrConfirmed { address } => { - event_string = "ExternalAddrConfirmed"; - info!(%address, "external address: confirmed"); - } - SwarmEvent::ExternalAddrExpired { address } => { - event_string = "ExternalAddrExpired"; - info!(%address, "external address: expired"); - } - other => { - event_string = "Other"; - - trace!("SwarmEvent has been ignored: {other:?}") - } - } - self.remove_outdated_connections(); - - self.log_handling(event_string.to_string(), start.elapsed()); - - trace!( - "SwarmEvent handled in {:?}: {event_string:?}", - start.elapsed() - ); - Ok(()) - } - - /// Forwards `Request` to the upper layers using `Sender`. Sends `Response` to the peers - pub fn handle_msg( - &mut self, - event: request_response::Event, - ) -> Result<(), NetworkError> { - match event { - request_response::Event::Message { message, peer } => match message { - Message::Request { - request, - channel, - request_id, - .. - } => { - trace!("Received request {request_id:?} from peer {peer:?}, req: {request:?}"); - // If the request is replication or quote verification, - // we can handle it and send the OK response here. - // As the handle result is unimportant to the sender. - match request { - Request::Cmd(sn_protocol::messages::Cmd::Replicate { holder, keys }) => { - let response = Response::Cmd( - sn_protocol::messages::CmdResponse::Replicate(Ok(())), - ); - self.swarm - .behaviour_mut() - .request_response - .send_response(channel, response) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - - self.add_keys_to_replication_fetcher(holder, keys); - } - Request::Cmd(sn_protocol::messages::Cmd::QuoteVerification { - quotes, - .. - }) => { - let response = Response::Cmd( - sn_protocol::messages::CmdResponse::QuoteVerification(Ok(())), - ); - self.swarm - .behaviour_mut() - .request_response - .send_response(channel, response) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - - // The keypair is required to verify the quotes, - // hence throw it up to Network layer for further actions. - let quotes = quotes - .iter() - .filter_map(|(peer_address, quote)| { - peer_address - .as_peer_id() - .map(|peer_id| (peer_id, quote.clone())) - }) - .collect(); - self.send_event(NetworkEvent::QuoteVerification { quotes }) - } - Request::Cmd(sn_protocol::messages::Cmd::PeerConsideredAsBad { - detected_by, - bad_peer, - bad_behaviour, - }) => { - let response = Response::Cmd( - sn_protocol::messages::CmdResponse::PeerConsideredAsBad(Ok(())), - ); - self.swarm - .behaviour_mut() - .request_response - .send_response(channel, response) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - - if bad_peer == NetworkAddress::from_peer(self.self_peer_id) { - warn!("Peer {detected_by:?} consider us as BAD, due to {bad_behaviour:?}."); - // TODO: shall we terminate self after received such notifications - // from the majority close_group nodes around us? - } else { - error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us."); - } - } - Request::Query(query) => { - self.send_event(NetworkEvent::QueryRequestReceived { - query, - channel: MsgResponder::FromPeer(channel), - }) - } - } - } - Message::Response { - request_id, - response, - } => { - trace!("Got response {request_id:?} from peer {peer:?}, res: {response}."); - if let Some(sender) = self.pending_requests.remove(&request_id) { - // The sender will be provided if the caller (Requester) is awaiting for a response - // at the call site. - // Else the Request was just sent to the peer and the Response was - // meant to be handled in another way and is not awaited. - match sender { - Some(sender) => sender - .send(Ok(response)) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?, - None => { - if let Response::Cmd(CmdResponse::Replicate(Ok(()))) = response { - // Nothing to do, response was fine - // This only exists to ensure we dont drop the handle and - // exit early, potentially logging false connection woes - } else { - // responses that are not awaited at the call site must be handled - // separately - self.send_event(NetworkEvent::ResponseReceived { - res: response, - }); - } - } - } - } else { - warn!("Tried to remove a RequestId from pending_requests which was not inserted in the first place. - Use Cmd::SendRequest with sender:None if you want the Response to be fed into the common handle_response function"); - } - } - }, - request_response::Event::OutboundFailure { - request_id, - error, - peer, - } => { - if let Some(sender) = self.pending_requests.remove(&request_id) { - match sender { - Some(sender) => { - sender - .send(Err(error.into())) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - None => { - warn!("RequestResponse: OutboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); - return Err(NetworkError::ReceivedResponseDropped(request_id)); - } - } - } else { - warn!("RequestResponse: OutboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); - return Err(NetworkError::ReceivedResponseDropped(request_id)); - } - } - request_response::Event::InboundFailure { - peer, - request_id, - error, - } => { - warn!("RequestResponse: InboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); - } - request_response::Event::ResponseSent { peer, request_id } => { - trace!("ResponseSent for request_id: {request_id:?} and peer: {peer:?}"); - } - } - Ok(()) - } - - fn handle_kad_event(&mut self, kad_event: kad::Event) -> Result<()> { - let start = Instant::now(); - let event_string; - - match kad_event { - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::GetClosestPeers(Ok(ref closest_peers)), - ref stats, - ref step, - } => { - event_string = "kad_event::get_closest_peers"; - trace!( - "Query task {id:?} of key {:?} returned with peers {:?}, {stats:?} - {step:?}", - hex::encode(closest_peers.key.clone()), - closest_peers.peers, - ); - - if let Entry::Occupied(mut entry) = self.pending_get_closest_peers.entry(id) { - let (_, current_closest) = entry.get_mut(); - - // TODO: consider order the result and terminate when reach any of the - // following criteria: - // 1, `stats.num_pending()` is 0 - // 2, `stats.duration()` is longer than a defined period - current_closest.extend(closest_peers.peers.clone()); - if current_closest.len() >= usize::from(K_VALUE) || step.last { - let (get_closest_type, current_closest) = entry.remove(); - match get_closest_type { - PendingGetClosestType::NetworkDiscovery => self - .network_discovery - .handle_get_closest_query(current_closest), - PendingGetClosestType::FunctionCall(sender) => { - sender - .send(current_closest) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } - } - } else { - trace!("Can't locate query task {id:?}, it has likely been completed already."); - return Err(NetworkError::ReceivedKademliaEventDropped { - query_id: id, - event: "GetClosestPeers Ok".to_string(), - }); - } - } - // Handle GetClosestPeers timeouts - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::GetClosestPeers(Err(ref err)), - ref stats, - ref step, - } => { - event_string = "kad_event::get_closest_peers_err"; - error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}"); - - let (get_closest_type, mut current_closest) = - self.pending_get_closest_peers.remove(&id).ok_or_else(|| { - trace!( - "Can't locate query task {id:?}, it has likely been completed already." - ); - NetworkError::ReceivedKademliaEventDropped { - query_id: id, - event: "Get ClosestPeers error".to_string(), - } - })?; - - // We have `current_closest` from previous progress, - // and `peers` from `GetClosestPeersError`. - // Trust them and leave for the caller to check whether they are enough. - match err { - GetClosestPeersError::Timeout { ref peers, .. } => { - current_closest.extend(peers); - } - } - - match get_closest_type { - PendingGetClosestType::NetworkDiscovery => self - .network_discovery - .handle_get_closest_query(current_closest), - PendingGetClosestType::FunctionCall(sender) => { - sender - .send(current_closest) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } - } - - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))), - stats, - step, - } => { - event_string = "kad_event::get_record::found"; - trace!( - "Query task {id:?} returned with record {:?} from peer {:?}, {stats:?} - {step:?}", - PrettyPrintRecordKey::from(&peer_record.record.key), - peer_record.peer - ); - self.accumulate_get_record_found(id, peer_record, stats, step)?; - } - kad::Event::OutboundQueryProgressed { - id, - result: - QueryResult::GetRecord(Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { - cache_candidates, - })), - stats, - step, - } => { - event_string = "kad_event::get_record::finished_no_additional"; - trace!("Query task {id:?} of get_record completed with {stats:?} - {step:?} - {cache_candidates:?}"); - self.handle_get_record_finished(id, step)?; - } - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Err(get_record_err)), - stats, - step, - } => { - // log the errors - match &get_record_err { - kad::GetRecordError::NotFound { key, closest_peers } => { - event_string = "kad_event::GetRecordError::NotFound"; - info!("Query task {id:?} NotFound record {:?} among peers {closest_peers:?}, {stats:?} - {step:?}", - PrettyPrintRecordKey::from(key)); - } - kad::GetRecordError::QuorumFailed { - key, - records, - quorum, - } => { - event_string = "kad_event::GetRecordError::QuorumFailed"; - let pretty_key = PrettyPrintRecordKey::from(key); - let peers = records - .iter() - .map(|peer_record| peer_record.peer) - .collect_vec(); - info!("Query task {id:?} QuorumFailed record {pretty_key:?} among peers {peers:?} with quorum {quorum:?}, {stats:?} - {step:?}"); - } - kad::GetRecordError::Timeout { key } => { - event_string = "kad_event::GetRecordError::Timeout"; - let pretty_key = PrettyPrintRecordKey::from(key); - - debug!( - "Query task {id:?} timed out when looking for record {pretty_key:?}" - ); - } - } - self.handle_get_record_error(id, get_record_err, stats, step)?; - } - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::PutRecord(Err(put_record_err)), - stats, - step, - } => { - // Currently, only `client` calls `put_record_to` to upload data. - // The result of such operation is not critical to client in general. - // However, if client keeps receiving error responses, it may indicating: - // 1, Client itself is with slow connection - // OR - // 2, The payee node selected could be in trouble - // - // TODO: Figure out which payee node the error response is related to, - // and may exclude that node from later on payee selection. - let (key, success, quorum) = match &put_record_err { - kad::PutRecordError::QuorumFailed { - key, - success, - quorum, - } => { - event_string = "kad_event::PutRecordError::QuorumFailed"; - (key, success, quorum) - } - kad::PutRecordError::Timeout { - key, - success, - quorum, - } => { - event_string = "kad_event::PutRecordError::Timeout"; - (key, success, quorum) - } - }; - error!("Query task {id:?} failed put record {:?} {:?}, required quorum {quorum}, stored on {success:?}, {stats:?} - {step:?}", - PrettyPrintRecordKey::from(key), event_string); - } - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::PutRecord(Ok(put_record_ok)), - stats, - step, - } => { - event_string = "kad_event::PutRecordOk"; - trace!( - "Query task {id:?} put record {:?} ok, {stats:?} - {step:?}", - PrettyPrintRecordKey::from(&put_record_ok.key) - ); - } - // Shall no longer receive this event - kad::Event::OutboundQueryProgressed { - id, - result: QueryResult::Bootstrap(bootstrap_result), - step, - .. - } => { - event_string = "kad_event::OutboundQueryProgressed::Bootstrap"; - // here BootstrapOk::num_remaining refers to the remaining random peer IDs to query, one per - // bucket that still needs refreshing. - trace!("Kademlia Bootstrap with {id:?} progressed with {bootstrap_result:?} and step {step:?}"); - } - kad::Event::RoutingUpdated { - peer, - is_new_peer, - old_peer, - .. - } => { - event_string = "kad_event::RoutingUpdated"; - if is_new_peer { - self.connected_peers = self.connected_peers.saturating_add(1); - - info!("New peer added to routing table: {peer:?}, now we have #{} connected peers", self.connected_peers); - self.log_kbuckets(&peer); - - // This should only happen once - if self.bootstrap.notify_new_peer() { - info!("Performing the first bootstrap"); - self.trigger_network_discovery(); - } - self.send_event(NetworkEvent::PeerAdded(peer, self.connected_peers)); - } - - info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.connected_peers); - if old_peer.is_some() { - self.connected_peers = self.connected_peers.saturating_sub(1); - - info!("Evicted old peer on new peer join: {old_peer:?}"); - self.send_event(NetworkEvent::PeerRemoved(peer, self.connected_peers)); - self.log_kbuckets(&peer); - } - let _ = self.check_for_change_in_our_close_group(); - } - kad::Event::InboundRequest { - request: InboundRequest::PutRecord { .. }, - } => { - event_string = "kad_event::InboundRequest::PutRecord"; - // Ignored to reduce logging. When `Record filtering` is enabled, - // the `record` variable will contain the content for further validation before put. - } - kad::Event::InboundRequest { - request: InboundRequest::FindNode { .. }, - } => { - event_string = "kad_event::InboundRequest::FindNode"; - // Ignored to reduce logging. With continuous bootstrap, this is triggered often. - } - kad::Event::InboundRequest { - request: - InboundRequest::GetRecord { - num_closer_peers, - present_locally, - }, - } => { - event_string = "kad_event::InboundRequest::GetRecord"; - if !present_locally && num_closer_peers < CLOSE_GROUP_SIZE { - trace!("InboundRequest::GetRecord doesn't have local record, with {num_closer_peers:?} closer_peers"); - } - } - kad::Event::UnroutablePeer { peer } => { - event_string = "kad_event::UnroutablePeer"; - trace!(peer_id = %peer, "kad::Event: UnroutablePeer"); - } - kad::Event::RoutablePeer { peer, .. } => { - // We get this when we don't add a peer via the identify step. - // And we don't want to add these as they were rejected by identify for some reason. - event_string = "kad_event::RoutablePeer"; - trace!(peer_id = %peer, "kad::Event: RoutablePeer"); - } - other => { - event_string = "kad_event::Other"; - trace!("kad::Event ignored: {other:?}"); - } - } - - self.log_handling(event_string.to_string(), start.elapsed()); - - trace!( - "kad::Event handled in {:?}: {event_string:?}", - start.elapsed() - ); - - Ok(()) - } - - /// Check for changes in our close group - /// - pub(crate) fn check_for_change_in_our_close_group(&mut self) -> bool { - // this includes self - let closest_k_peers = self.get_closest_k_value_local_peers(); - - let new_closest_peers: Vec<_> = - closest_k_peers.into_iter().take(CLOSE_GROUP_SIZE).collect(); - - let old = self.close_group.iter().cloned().collect::>(); - let new_members: Vec<_> = new_closest_peers - .iter() - .filter(|p| !old.contains(p)) - .collect(); - if !new_members.is_empty() { - debug!("The close group has been updated. The new members are {new_members:?}"); - debug!("New close group: {new_closest_peers:?}"); - self.close_group = new_closest_peers; - true - } else { - false - } - } - - pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) { - let distance = NetworkAddress::from_peer(self.self_peer_id) - .distance(&NetworkAddress::from_peer(*peer)); - info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2()); - let mut kbucket_table_stats = vec![]; - let mut index = 0; - let mut total_peers = 0; - for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { - let range = kbucket.range(); - total_peers += kbucket.num_entries(); - if let Some(distance) = range.0.ilog2() { - kbucket_table_stats.push((index, kbucket.num_entries(), distance)); - } else { - // This shall never happen. - error!("bucket #{index:?} is ourself ???!!!"); - } - index += 1; - } - info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}"); - } - - // if target bucket is full, remove a bootstrap node if presents. - fn remove_bootstrap_from_full(&mut self, peer_id: PeerId) { - let mut shall_removed = None; - - if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) { - if kbucket.num_entries() >= K_VALUE.into() { - if let Some(peers) = self.bootstrap_peers.get(&kbucket.range().0.ilog2()) { - for peer_entry in kbucket.iter() { - if peers.contains(peer_entry.node.key.preimage()) { - shall_removed = Some(*peer_entry.node.key.preimage()); - break; - } - } - } - } - } - if let Some(to_be_removed_bootstrap) = shall_removed { - trace!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}"); - let _entry = self - .swarm - .behaviour_mut() - .kademlia - .remove_peer(&to_be_removed_bootstrap); - } - } - - // Remove outdated connection to a peer if it is not in the RT. - fn remove_outdated_connections(&mut self) { - let mut shall_removed = vec![]; - - let timed_out_connections = - self.live_connected_peers - .iter() - .filter_map(|(connection_id, (peer_id, timeout))| { - if Instant::now() > *timeout { - Some((connection_id, peer_id)) - } else { - None - } - }); - - for (connection_id, peer_id) in timed_out_connections { - // Skip if the peer is present in our RT - if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(*peer_id) { - if kbucket - .iter() - .any(|peer_entry| *peer_id == *peer_entry.node.key.preimage()) - { - continue; - } - } - - // skip if the peer is a relay server that we're connected to - if self.relay_manager.keep_alive_peer(peer_id) { - continue; - } - - shall_removed.push((*connection_id, *peer_id)); - } - - if !shall_removed.is_empty() { - trace!( - "Current libp2p peers pool stats is {:?}", - self.swarm.network_info() - ); - trace!( - "Removing {} outdated live connections, still have {} left.", - shall_removed.len(), - self.live_connected_peers.len() - ); - trace!(?self.relay_manager); - - for (connection_id, peer_id) in shall_removed { - let _ = self.live_connected_peers.remove(&connection_id); - let result = self.swarm.close_connection(connection_id); - trace!("Removed outdated connection {connection_id:?} to {peer_id:?} with result: {result:?}"); - } - } - } - - fn add_keys_to_replication_fetcher( - &mut self, - sender: NetworkAddress, - incoming_keys: Vec<(NetworkAddress, RecordType)>, - ) { - let holder = if let Some(peer_id) = sender.as_peer_id() { - peer_id - } else { - warn!("Replication list sender is not a peer_id {sender:?}"); - return; - }; - - trace!( - "Received replication list from {holder:?} of {} keys", - incoming_keys.len() - ); - - // accept replication requests from the K_VALUE peers away, - // giving us some margin for replication - let closest_k_peers = self.get_closest_k_value_local_peers(); - if !closest_k_peers.contains(&holder) || holder == self.self_peer_id { - trace!("Holder {holder:?} is self or not in replication range."); - return; - } - - // On receive a replication_list from a close_group peer, we undertake two tasks: - // 1, For those keys that we don't have: - // fetch them if close enough to us - // 2, For those keys that we have and supposed to be held by the sender as well: - // start chunk_proof check against a randomly selected chunk type record to the sender - - // For fetching, only handle those non-exist and in close range keys - let keys_to_store = - self.select_non_existent_records_for_replications(&incoming_keys, &closest_k_peers); - - if keys_to_store.is_empty() { - debug!("Empty keys to store after adding to"); - } else { - #[allow(clippy::mutable_key_type)] - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - let keys_to_fetch = self - .replication_fetcher - .add_keys(holder, keys_to_store, all_keys); - if keys_to_fetch.is_empty() { - trace!("no waiting keys to fetch from the network"); - } else { - self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); - } - } - - // Only trigger chunk_proof check when received a periodical replication request. - if incoming_keys.len() > 1 { - let keys_to_verify = self.select_verification_data_candidates(sender); - - if keys_to_verify.is_empty() { - debug!("No valid candidate to be checked against peer {holder:?}"); - } else { - self.send_event(NetworkEvent::ChunkProofVerification { - peer_id: holder, - keys_to_verify, - }); - } - } - } - - /// Checks suggested records against what we hold, so we only - /// enqueue what we do not have - fn select_non_existent_records_for_replications( - &mut self, - incoming_keys: &[(NetworkAddress, RecordType)], - closest_k_peers: &Vec, - ) -> Vec<(NetworkAddress, RecordType)> { - #[allow(clippy::mutable_key_type)] - let locally_stored_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - let non_existent_keys: Vec<_> = incoming_keys - .iter() - .filter(|(addr, record_type)| { - let key = addr.to_record_key(); - let local = locally_stored_keys.get(&key); - - // if we have a local value of matching record_type, we don't need to fetch it - if let Some((_, local_record_type)) = local { - let not_same_type = local_record_type != record_type; - if not_same_type { - // Shall only happens for Register - info!("Record {addr:?} has different type: local {local_record_type:?}, incoming {record_type:?}"); - } - not_same_type - } else { - true - } - }) - .collect(); - - non_existent_keys - .into_iter() - .filter_map(|(key, record_type)| { - if Self::is_in_close_range(&self.self_peer_id, key, closest_k_peers) { - Some((key.clone(), record_type.clone())) - } else { - // Reduce the log level as there will always be around 40% records being - // out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2` - // to send our replication list to provide addressing margin. - // Given there will normally be 6 nodes sending such list with interval of 5-10s, - // this will accumulate to a lot of logs with the increasing records uploaded. - trace!("not in close range for key {key:?}"); - None - } - }) - .collect() - } - - // A close target doesn't falls into the close peers range: - // For example, a node b11111X has an RT: [(1, b1111), (2, b111), (5, b11), (9, b1), (7, b0)] - // Then for a target bearing b011111 as prefix, all nodes in (7, b0) are its close_group peers. - // Then the node b11111X. But b11111X's close_group peers [(1, b1111), (2, b111), (5, b11)] - // are none among target b011111's close range. - // Hence, the ilog2 calculation based on close_range cannot cover such case. - // And have to sort all nodes to figure out whether self is among the close_group to the target. - pub(crate) fn is_in_close_range( - our_peer_id: &PeerId, - target: &NetworkAddress, - all_peers: &Vec, - ) -> bool { - if all_peers.len() <= REPLICATION_PEERS_COUNT { - return true; - } - - // Margin of 2 to allow our RT being bit lagging. - match sort_peers_by_address(all_peers, target, REPLICATION_PEERS_COUNT) { - Ok(close_group) => close_group.contains(&our_peer_id), - Err(err) => { - warn!("Could not get sorted peers for {target:?} with error {err:?}"); - true - } - } - } - - /// Check among all chunk type records that we have, select those close to the peer, - /// and randomly pick one as the verification candidate. - #[allow(clippy::mutable_key_type)] - fn select_verification_data_candidates(&mut self, peer: NetworkAddress) -> Vec { - let mut closest_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(20) - .collect_vec(); - closest_peers.push(self.self_peer_id); - - let target_peer = if let Some(peer_id) = peer.as_peer_id() { - peer_id - } else { - error!("Target {peer:?} is not a valid PeerId"); - return vec![]; - }; - - #[allow(clippy::mutable_key_type)] - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - - // Targeted chunk type record shall be expected within the close range from our perspective. - let mut verify_candidates: Vec = all_keys - .values() - .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { - match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { - Ok(close_group) => { - if close_group.contains(&&target_peer) { - Some(addr.clone()) - } else { - None - } - } - Err(err) => { - warn!("Could not get sorted peers for {addr:?} with error {err:?}"); - None - } - } - } else { - None - } - }) - .collect(); - - verify_candidates.sort_by_key(|a| peer.distance(a)); - - // To ensure the candidate mush have to be held by the peer, - // we only carry out check when there are already certain amount of chunks uploaded - // AND choose candidate from certain reduced range. - if verify_candidates.len() > 50 { - let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); - vec![verify_candidates[index].clone()] - } else { - vec![] - } - } -} - -/// Helper function to print formatted connection role info. -fn endpoint_str(endpoint: &libp2p::core::ConnectedPoint) -> String { - match endpoint { - libp2p::core::ConnectedPoint::Dialer { address, .. } => { - format!("outgoing ({address})") - } - libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { - format!("incoming ({send_back_addr})") - } - } -} diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs new file mode 100644 index 0000000000..7f3e111751 --- /dev/null +++ b/sn_networking/src/event/kad.rs @@ -0,0 +1,585 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{ + driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError, + NetworkEvent, Result, SwarmDriver, CLOSE_GROUP_SIZE, +}; +use itertools::Itertools; +use libp2p::kad::{ + self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult, + QueryStats, Record, K_VALUE, +}; +use sn_protocol::PrettyPrintRecordKey; +use std::{ + collections::{hash_map::Entry, HashSet}, + time::Instant, +}; +use tokio::sync::oneshot; +use xor_name::XorName; + +impl SwarmDriver { + pub(super) fn handle_kad_event(&mut self, kad_event: libp2p::kad::Event) -> Result<()> { + let start = Instant::now(); + let event_string; + + match kad_event { + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::GetClosestPeers(Ok(ref closest_peers)), + ref stats, + ref step, + } => { + event_string = "kad_event::get_closest_peers"; + trace!( + "Query task {id:?} of key {:?} returned with peers {:?}, {stats:?} - {step:?}", + hex::encode(closest_peers.key.clone()), + closest_peers.peers, + ); + + if let Entry::Occupied(mut entry) = self.pending_get_closest_peers.entry(id) { + let (_, current_closest) = entry.get_mut(); + + // TODO: consider order the result and terminate when reach any of the + // following criteria: + // 1, `stats.num_pending()` is 0 + // 2, `stats.duration()` is longer than a defined period + current_closest.extend(closest_peers.peers.clone()); + if current_closest.len() >= usize::from(K_VALUE) || step.last { + let (get_closest_type, current_closest) = entry.remove(); + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + } + } + } else { + trace!("Can't locate query task {id:?}, it has likely been completed already."); + return Err(NetworkError::ReceivedKademliaEventDropped { + query_id: id, + event: "GetClosestPeers Ok".to_string(), + }); + } + } + // Handle GetClosestPeers timeouts + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::GetClosestPeers(Err(ref err)), + ref stats, + ref step, + } => { + event_string = "kad_event::get_closest_peers_err"; + error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}"); + + let (get_closest_type, mut current_closest) = + self.pending_get_closest_peers.remove(&id).ok_or_else(|| { + trace!( + "Can't locate query task {id:?}, it has likely been completed already." + ); + NetworkError::ReceivedKademliaEventDropped { + query_id: id, + event: "Get ClosestPeers error".to_string(), + } + })?; + + // We have `current_closest` from previous progress, + // and `peers` from `GetClosestPeersError`. + // Trust them and leave for the caller to check whether they are enough. + match err { + GetClosestPeersError::Timeout { ref peers, .. } => { + current_closest.extend(peers); + } + } + + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + } + } + + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))), + stats, + step, + } => { + event_string = "kad_event::get_record::found"; + trace!( + "Query task {id:?} returned with record {:?} from peer {:?}, {stats:?} - {step:?}", + PrettyPrintRecordKey::from(&peer_record.record.key), + peer_record.peer + ); + self.accumulate_get_record_found(id, peer_record, stats, step)?; + } + kad::Event::OutboundQueryProgressed { + id, + result: + QueryResult::GetRecord(Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { + cache_candidates, + })), + stats, + step, + } => { + event_string = "kad_event::get_record::finished_no_additional"; + trace!("Query task {id:?} of get_record completed with {stats:?} - {step:?} - {cache_candidates:?}"); + self.handle_get_record_finished(id, step)?; + } + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Err(get_record_err)), + stats, + step, + } => { + // log the errors + match &get_record_err { + kad::GetRecordError::NotFound { key, closest_peers } => { + event_string = "kad_event::GetRecordError::NotFound"; + info!("Query task {id:?} NotFound record {:?} among peers {closest_peers:?}, {stats:?} - {step:?}", + PrettyPrintRecordKey::from(key)); + } + kad::GetRecordError::QuorumFailed { + key, + records, + quorum, + } => { + event_string = "kad_event::GetRecordError::QuorumFailed"; + let pretty_key = PrettyPrintRecordKey::from(key); + let peers = records + .iter() + .map(|peer_record| peer_record.peer) + .collect_vec(); + info!("Query task {id:?} QuorumFailed record {pretty_key:?} among peers {peers:?} with quorum {quorum:?}, {stats:?} - {step:?}"); + } + kad::GetRecordError::Timeout { key } => { + event_string = "kad_event::GetRecordError::Timeout"; + let pretty_key = PrettyPrintRecordKey::from(key); + + debug!( + "Query task {id:?} timed out when looking for record {pretty_key:?}" + ); + } + } + self.handle_get_record_error(id, get_record_err, stats, step)?; + } + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::PutRecord(Err(put_record_err)), + stats, + step, + } => { + // Currently, only `client` calls `put_record_to` to upload data. + // The result of such operation is not critical to client in general. + // However, if client keeps receiving error responses, it may indicating: + // 1, Client itself is with slow connection + // OR + // 2, The payee node selected could be in trouble + // + // TODO: Figure out which payee node the error response is related to, + // and may exclude that node from later on payee selection. + let (key, success, quorum) = match &put_record_err { + kad::PutRecordError::QuorumFailed { + key, + success, + quorum, + } => { + event_string = "kad_event::PutRecordError::QuorumFailed"; + (key, success, quorum) + } + kad::PutRecordError::Timeout { + key, + success, + quorum, + } => { + event_string = "kad_event::PutRecordError::Timeout"; + (key, success, quorum) + } + }; + error!("Query task {id:?} failed put record {:?} {:?}, required quorum {quorum}, stored on {success:?}, {stats:?} - {step:?}", + PrettyPrintRecordKey::from(key), event_string); + } + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::PutRecord(Ok(put_record_ok)), + stats, + step, + } => { + event_string = "kad_event::PutRecordOk"; + trace!( + "Query task {id:?} put record {:?} ok, {stats:?} - {step:?}", + PrettyPrintRecordKey::from(&put_record_ok.key) + ); + } + // Shall no longer receive this event + kad::Event::OutboundQueryProgressed { + id, + result: QueryResult::Bootstrap(bootstrap_result), + step, + .. + } => { + event_string = "kad_event::OutboundQueryProgressed::Bootstrap"; + // here BootstrapOk::num_remaining refers to the remaining random peer IDs to query, one per + // bucket that still needs refreshing. + trace!("Kademlia Bootstrap with {id:?} progressed with {bootstrap_result:?} and step {step:?}"); + } + kad::Event::RoutingUpdated { + peer, + is_new_peer, + old_peer, + .. + } => { + event_string = "kad_event::RoutingUpdated"; + if is_new_peer { + self.connected_peers = self.connected_peers.saturating_add(1); + + info!("New peer added to routing table: {peer:?}, now we have #{} connected peers", self.connected_peers); + self.log_kbuckets(&peer); + + // This should only happen once + if self.bootstrap.notify_new_peer() { + info!("Performing the first bootstrap"); + self.trigger_network_discovery(); + } + self.send_event(NetworkEvent::PeerAdded(peer, self.connected_peers)); + } + + info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.connected_peers); + if old_peer.is_some() { + self.connected_peers = self.connected_peers.saturating_sub(1); + + info!("Evicted old peer on new peer join: {old_peer:?}"); + self.send_event(NetworkEvent::PeerRemoved(peer, self.connected_peers)); + self.log_kbuckets(&peer); + } + let _ = self.check_for_change_in_our_close_group(); + } + kad::Event::InboundRequest { + request: InboundRequest::PutRecord { .. }, + } => { + event_string = "kad_event::InboundRequest::PutRecord"; + // Ignored to reduce logging. When `Record filtering` is enabled, + // the `record` variable will contain the content for further validation before put. + } + kad::Event::InboundRequest { + request: InboundRequest::FindNode { .. }, + } => { + event_string = "kad_event::InboundRequest::FindNode"; + // Ignored to reduce logging. With continuous bootstrap, this is triggered often. + } + kad::Event::InboundRequest { + request: + InboundRequest::GetRecord { + num_closer_peers, + present_locally, + }, + } => { + event_string = "kad_event::InboundRequest::GetRecord"; + if !present_locally && num_closer_peers < CLOSE_GROUP_SIZE { + trace!("InboundRequest::GetRecord doesn't have local record, with {num_closer_peers:?} closer_peers"); + } + } + kad::Event::UnroutablePeer { peer } => { + event_string = "kad_event::UnroutablePeer"; + trace!(peer_id = %peer, "kad::Event: UnroutablePeer"); + } + kad::Event::RoutablePeer { peer, .. } => { + // We get this when we don't add a peer via the identify step. + // And we don't want to add these as they were rejected by identify for some reason. + event_string = "kad_event::RoutablePeer"; + trace!(peer_id = %peer, "kad::Event: RoutablePeer"); + } + other => { + event_string = "kad_event::Other"; + trace!("kad::Event ignored: {other:?}"); + } + } + + self.log_handling(event_string.to_string(), start.elapsed()); + + trace!( + "kad::Event handled in {:?}: {event_string:?}", + start.elapsed() + ); + + Ok(()) + } + + // For `get_record` returning behaviour: + // 1, targeting a non-existing entry + // there will only be one event of `kad::Event::OutboundQueryProgressed` + // with `ProgressStep::last` to be `true` + // `QueryStats::requests` to be 20 (K-Value) + // `QueryStats::success` to be over majority of the requests + // `err::NotFound::closest_peers` contains a list of CLOSE_GROUP_SIZE peers + // 2, targeting an existing entry + // there will a sequence of (at least CLOSE_GROUP_SIZE) events of + // `kad::Event::OutboundQueryProgressed` to be received + // with `QueryStats::end` always being `None` + // `ProgressStep::last` all to be `false` + // `ProgressStep::count` to be increased with step of 1 + // capped and stopped at CLOSE_GROUP_SIZE, may have duplicated counts + // `PeerRecord::peer` could be None to indicate from self + // in which case it always use a duplicated `ProgressStep::count` + // the sequence will be completed with `FinishedWithNoAdditionalRecord` + // where: `cache_candidates`: being the peers supposed to hold the record but not + // `ProgressStep::count`: to be `number of received copies plus one` + // `ProgressStep::last` to be `true` + + /// Accumulates the GetRecord query results + /// If we get enough responses (quorum) for a record with the same content hash: + /// - we return the Record after comparing with the target record. This might return RecordDoesNotMatch if the + /// check fails. + /// - if multiple content hashes are found, we return a SplitRecord Error + /// And then we stop the kad query as we are done here. + fn accumulate_get_record_found( + &mut self, + query_id: QueryId, + peer_record: PeerRecord, + _stats: QueryStats, + step: ProgressStep, + ) -> Result<()> { + let peer_id = if let Some(peer_id) = peer_record.peer { + peer_id + } else { + self.self_peer_id + }; + let pretty_key = PrettyPrintRecordKey::from(&peer_record.record.key).into_owned(); + + if let Entry::Occupied(mut entry) = self.pending_get_record.entry(query_id) { + let (_sender, result_map, cfg) = entry.get_mut(); + + if !cfg.expected_holders.is_empty() { + if cfg.expected_holders.remove(&peer_id) { + debug!("For record {pretty_key:?} task {query_id:?}, received a copy from an expected holder {peer_id:?}"); + } else { + debug!("For record {pretty_key:?} task {query_id:?}, received a copy from an unexpected holder {peer_id:?}"); + } + } + + // Insert the record and the peer into the result_map. + let record_content_hash = XorName::from_content(&peer_record.record.value); + let responded_peers = + if let Entry::Occupied(mut entry) = result_map.entry(record_content_hash) { + let (_, peer_list) = entry.get_mut(); + let _ = peer_list.insert(peer_id); + peer_list.len() + } else { + let mut peer_list = HashSet::new(); + let _ = peer_list.insert(peer_id); + result_map.insert(record_content_hash, (peer_record.record.clone(), peer_list)); + 1 + }; + + let expected_answers = get_quorum_value(&cfg.get_quorum); + + trace!("Expecting {expected_answers:?} answers for record {pretty_key:?} task {query_id:?}, received {responded_peers} so far"); + + if responded_peers >= expected_answers { + if !cfg.expected_holders.is_empty() { + debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with non-responded expected holders {:?}", cfg.expected_holders); + } + let cfg = cfg.clone(); + + // Remove the query task and consume the variables. + let (sender, result_map, _) = entry.remove(); + + if result_map.len() == 1 { + Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?; + } else { + debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record"); + sender + .send(Err(GetRecordError::SplitRecord { result_map })) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + + // Stop the query; possibly stops more nodes from being queried. + if let Some(mut query) = self.swarm.behaviour_mut().kademlia.query_mut(&query_id) { + query.finish(); + } + } else if usize::from(step.count) >= CLOSE_GROUP_SIZE { + debug!("For record {pretty_key:?} task {query_id:?}, got {:?} with {} versions so far.", + step.count, result_map.len()); + } + } else { + // return error if the entry cannot be found + return Err(NetworkError::ReceivedKademliaEventDropped { + query_id, + event: format!("Accumulate Get Record of {pretty_key:?}"), + }); + } + Ok(()) + } + + /// Handles the possible cases when a GetRecord Query completes. + /// The accumulate_get_record_found returns the record if the quorum is satisfied, but, if we have reached this point + /// then we did not get enough records or we got split records (which prevented the quorum to pass). + /// Returns the following errors: + /// RecordNotFound if the result_map is empty. + /// NotEnoughCopies if there is only a single content hash version. + /// SplitRecord if there are multiple content hash versions. + fn handle_get_record_finished(&mut self, query_id: QueryId, step: ProgressStep) -> Result<()> { + // return error if the entry cannot be found + if let Some((sender, result_map, cfg)) = self.pending_get_record.remove(&query_id) { + let num_of_versions = result_map.len(); + let (result, log_string) = if let Some((record, from_peers)) = + result_map.values().next() + { + let result = if num_of_versions == 1 { + Err(GetRecordError::NotEnoughCopies { + record: record.clone(), + expected: get_quorum_value(&cfg.get_quorum), + got: from_peers.len(), + }) + } else { + Err(GetRecordError::SplitRecord { + result_map: result_map.clone(), + }) + }; + + ( + result, + format!("Getting record {:?} completed with only {:?} copies received, and {num_of_versions} versions.", + PrettyPrintRecordKey::from(&record.key), usize::from(step.count) - 1) + ) + } else { + ( + Err(GetRecordError::RecordNotFound), + format!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count), + ) + }; + + if cfg.expected_holders.is_empty() { + debug!("{log_string}"); + } else { + debug!( + "{log_string}, and {:?} expected holders not responded", + cfg.expected_holders + ); + } + + sender + .send(result) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } else { + // We manually perform `query.finish()` if we return early from accumulate fn. + // Thus we will still get FinishedWithNoAdditionalRecord. + trace!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender."); + } + Ok(()) + } + + /// Handles the possible cases when a kad GetRecord returns an error. + /// If we get NotFound/QuorumFailed, we return a RecordNotFound error. Kad currently does not enforce any quorum. + /// If we get a Timeout: + /// - return a QueryTimeout if we get a split record (?) if we have multiple content hashes. + /// - if the quorum is satisfied, we return the record after comparing it with the target record. This might return + /// RecordDoesNotMatch if the check fails. + /// - else we return q QueryTimeout error. + fn handle_get_record_error( + &mut self, + query_id: QueryId, + get_record_err: kad::GetRecordError, + _stats: QueryStats, + _step: ProgressStep, + ) -> Result<()> { + match &get_record_err { + kad::GetRecordError::NotFound { .. } | kad::GetRecordError::QuorumFailed { .. } => { + // return error if the entry cannot be found + let (sender, _, cfg) = + self.pending_get_record.remove(&query_id).ok_or_else(|| { + trace!("Can't locate query task {query_id:?}, it has likely been completed already."); + NetworkError::ReceivedKademliaEventDropped { + query_id, + event: "GetRecordError NotFound or QuorumFailed".to_string(), + } + })?; + + if cfg.expected_holders.is_empty() { + info!("Get record task {query_id:?} failed with error {get_record_err:?}"); + } else { + debug!("Get record task {query_id:?} failed with {:?} expected holders not responded, error {get_record_err:?}", cfg.expected_holders); + } + sender + .send(Err(GetRecordError::RecordNotFound)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + kad::GetRecordError::Timeout { key } => { + // return error if the entry cannot be found + let pretty_key = PrettyPrintRecordKey::from(key); + let (sender, result_map, cfg) = + self.pending_get_record.remove(&query_id).ok_or_else(|| { + trace!( + "Can't locate query task {query_id:?} for {pretty_key:?}, it has likely been completed already." + ); + NetworkError::ReceivedKademliaEventDropped { + query_id, + event: format!("GetRecordError Timeout {pretty_key:?}"), + } + })?; + + let required_response_count = get_quorum_value(&cfg.get_quorum); + + // if we've a split over the result xorname, then we don't attempt to resolve this here. + // Retry and resolve through normal flows without a timeout. + // todo: is the above still the case? Why don't we return a split record error. + if result_map.len() > 1 { + warn!( + "Get record task {query_id:?} for {pretty_key:?} timed out with split result map" + ); + sender + .send(Err(GetRecordError::QueryTimeout)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + + return Ok(()); + } + + // if we have enough responses here, we can return the record + if let Some((record, peers)) = result_map.values().next() { + if peers.len() >= required_response_count { + Self::send_record_after_checking_target(sender, record.clone(), &cfg)?; + return Ok(()); + } + } + + warn!("Get record task {query_id:?} for {pretty_key:?} returned insufficient responses. {:?} did not return record", cfg.expected_holders); + // Otherwise report the timeout + sender + .send(Err(GetRecordError::QueryTimeout)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + } + + Ok(()) + } + + fn send_record_after_checking_target( + sender: oneshot::Sender>, + record: Record, + cfg: &GetRecordCfg, + ) -> Result<()> { + if cfg.target_record.is_none() || cfg.does_target_match(&record) { + sender + .send(Ok(record)) + .map_err(|_| NetworkError::InternalMsgChannelDropped) + } else { + sender + .send(Err(GetRecordError::RecordDoesNotMatch(record))) + .map_err(|_| NetworkError::InternalMsgChannelDropped) + } + } +} diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs new file mode 100644 index 0000000000..e4112d1549 --- /dev/null +++ b/sn_networking/src/event/mod.rs @@ -0,0 +1,273 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +mod kad; +mod request_response; +mod swarm; + +use crate::{driver::SwarmDriver, error::Result, CLOSE_GROUP_SIZE}; +use core::fmt; +use custom_debug::Debug as CustomDebug; +#[cfg(feature = "local-discovery")] +use libp2p::mdns; +use libp2p::{ + kad::{Record, RecordKey}, + request_response::ResponseChannel as PeerResponseChannel, + Multiaddr, PeerId, +}; + +use sn_protocol::{ + messages::{Query, Request, Response}, + NetworkAddress, PrettyPrintRecordKey, +}; +use sn_transfers::PaymentQuote; +use std::{ + collections::{BTreeSet, HashSet}, + fmt::{Debug, Formatter}, +}; +use tokio::sync::oneshot; + +/// NodeEvent enum +#[derive(CustomDebug)] +pub(super) enum NodeEvent { + MsgReceived(libp2p::request_response::Event), + Kademlia(libp2p::kad::Event), + #[cfg(feature = "local-discovery")] + Mdns(Box), + Identify(Box), + Dcutr(Box), + RelayClient(Box), + RelayServer(Box), +} + +impl From> for NodeEvent { + fn from(event: libp2p::request_response::Event) -> Self { + NodeEvent::MsgReceived(event) + } +} + +impl From for NodeEvent { + fn from(event: libp2p::kad::Event) -> Self { + NodeEvent::Kademlia(event) + } +} + +#[cfg(feature = "local-discovery")] +impl From for NodeEvent { + fn from(event: mdns::Event) -> Self { + NodeEvent::Mdns(Box::new(event)) + } +} + +impl From for NodeEvent { + fn from(event: libp2p::identify::Event) -> Self { + NodeEvent::Identify(Box::new(event)) + } +} +impl From for NodeEvent { + fn from(event: libp2p::dcutr::Event) -> Self { + NodeEvent::Dcutr(Box::new(event)) + } +} +impl From for NodeEvent { + fn from(event: libp2p::relay::client::Event) -> Self { + NodeEvent::RelayClient(Box::new(event)) + } +} +impl From for NodeEvent { + fn from(event: libp2p::relay::Event) -> Self { + NodeEvent::RelayServer(Box::new(event)) + } +} + +#[derive(CustomDebug)] +/// Channel to send the `Response` through. +pub enum MsgResponder { + /// Respond to a request from `self` through a simple one-shot channel. + FromSelf(Option>>), + /// Respond to a request from a peer in the network. + FromPeer(PeerResponseChannel), +} + +#[allow(clippy::large_enum_variant)] +/// Events forwarded by the underlying Network; to be used by the upper layers +pub enum NetworkEvent { + /// Incoming `Query` from a peer + QueryRequestReceived { + /// Query + query: Query, + /// The channel to send the `Response` through + channel: MsgResponder, + }, + /// Handles the responses that are not awaited at the call site + ResponseReceived { + /// Response + res: Response, + }, + /// Peer has been added to the Routing Table. And the number of connected peers. + PeerAdded(PeerId, usize), + /// Peer has been removed from the Routing Table. And the number of connected peers. + PeerRemoved(PeerId, usize), + /// The peer does not support our protocol + PeerWithUnsupportedProtocol { + our_protocol: String, + their_protocol: String, + }, + /// The peer is now considered as a bad node, due to the detected bad behaviour + PeerConsideredAsBad { + detected_by: PeerId, + bad_peer: PeerId, + bad_behaviour: String, + }, + /// The records bearing these keys are to be fetched from the holder or the network + KeysToFetchForReplication(Vec<(PeerId, RecordKey)>), + /// Started listening on a new address + NewListenAddr(Multiaddr), + /// Report unverified record + UnverifiedRecord(Record), + /// Terminate Node on unrecoverable errors + TerminateNode { reason: TerminateNodeReason }, + /// List of peer nodes that failed to fetch replication copy from. + FailedToFetchHolders(BTreeSet), + /// A peer in RT that supposed to be verified. + BadNodeVerification { peer_id: PeerId }, + /// Quotes to be verified + QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, + /// Carry out chunk proof check against the specified record and peer + ChunkProofVerification { + peer_id: PeerId, + keys_to_verify: Vec, + }, +} + +/// Terminate node for the following reason +#[derive(Debug, Clone)] +pub enum TerminateNodeReason { + HardDiskWriteError, +} + +// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected. +impl Debug for NetworkEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + NetworkEvent::QueryRequestReceived { query, .. } => { + write!(f, "NetworkEvent::QueryRequestReceived({query:?})") + } + NetworkEvent::ResponseReceived { res, .. } => { + write!(f, "NetworkEvent::ResponseReceived({res:?})") + } + NetworkEvent::PeerAdded(peer_id, connected_peers) => { + write!(f, "NetworkEvent::PeerAdded({peer_id:?}, {connected_peers})") + } + NetworkEvent::PeerRemoved(peer_id, connected_peers) => { + write!( + f, + "NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})" + ) + } + NetworkEvent::PeerWithUnsupportedProtocol { + our_protocol, + their_protocol, + } => { + write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})") + } + NetworkEvent::PeerConsideredAsBad { + bad_peer, + bad_behaviour, + .. + } => { + write!( + f, + "NetworkEvent::PeerConsideredAsBad({bad_peer:?}, {bad_behaviour:?})" + ) + } + NetworkEvent::KeysToFetchForReplication(list) => { + let keys_len = list.len(); + write!(f, "NetworkEvent::KeysForReplication({keys_len:?})") + } + NetworkEvent::NewListenAddr(addr) => { + write!(f, "NetworkEvent::NewListenAddr({addr:?})") + } + NetworkEvent::UnverifiedRecord(record) => { + let pretty_key = PrettyPrintRecordKey::from(&record.key); + write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})") + } + NetworkEvent::TerminateNode { reason } => { + write!(f, "NetworkEvent::TerminateNode({reason:?})") + } + NetworkEvent::FailedToFetchHolders(bad_nodes) => { + write!(f, "NetworkEvent::FailedToFetchHolders({bad_nodes:?})") + } + NetworkEvent::BadNodeVerification { peer_id } => { + write!(f, "NetworkEvent::BadNodeVerification({peer_id:?})") + } + NetworkEvent::QuoteVerification { quotes } => { + write!( + f, + "NetworkEvent::QuoteVerification({} quotes)", + quotes.len() + ) + } + NetworkEvent::ChunkProofVerification { + peer_id, + keys_to_verify, + } => { + write!( + f, + "NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})" + ) + } + } + } +} + +impl SwarmDriver { + /// Check for changes in our close group + pub(crate) fn check_for_change_in_our_close_group(&mut self) -> bool { + // this includes self + let closest_k_peers = self.get_closest_k_value_local_peers(); + + let new_closest_peers: Vec<_> = + closest_k_peers.into_iter().take(CLOSE_GROUP_SIZE).collect(); + + let old = self.close_group.iter().cloned().collect::>(); + let new_members: Vec<_> = new_closest_peers + .iter() + .filter(|p| !old.contains(p)) + .collect(); + if !new_members.is_empty() { + debug!("The close group has been updated. The new members are {new_members:?}"); + debug!("New close group: {new_closest_peers:?}"); + self.close_group = new_closest_peers; + true + } else { + false + } + } + + pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) { + let distance = NetworkAddress::from_peer(self.self_peer_id) + .distance(&NetworkAddress::from_peer(*peer)); + info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2()); + let mut kbucket_table_stats = vec![]; + let mut index = 0; + let mut total_peers = 0; + for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { + let range = kbucket.range(); + total_peers += kbucket.num_entries(); + if let Some(distance) = range.0.ilog2() { + kbucket_table_stats.push((index, kbucket.num_entries(), distance)); + } else { + // This shall never happen. + error!("bucket #{index:?} is ourself ???!!!"); + } + index += 1; + } + info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}"); + } +} diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs new file mode 100644 index 0000000000..e0f102a9d1 --- /dev/null +++ b/sn_networking/src/event/request_response.rs @@ -0,0 +1,394 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{ + sort_peers_by_address, MsgResponder, NetworkError, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, + REPLICATION_PEERS_COUNT, +}; +use itertools::Itertools; +use libp2p::{ + request_response::{self, Message}, + PeerId, +}; +use rand::{rngs::OsRng, Rng}; +use sn_protocol::{ + messages::{CmdResponse, Request, Response}, + storage::RecordType, + NetworkAddress, +}; + +impl SwarmDriver { + /// Forwards `Request` to the upper layers using `Sender`. Sends `Response` to the peers + pub(super) fn handle_req_resp_events( + &mut self, + event: request_response::Event, + ) -> Result<(), NetworkError> { + match event { + request_response::Event::Message { message, peer } => match message { + Message::Request { + request, + channel, + request_id, + .. + } => { + trace!("Received request {request_id:?} from peer {peer:?}, req: {request:?}"); + // If the request is replication or quote verification, + // we can handle it and send the OK response here. + // As the handle result is unimportant to the sender. + match request { + Request::Cmd(sn_protocol::messages::Cmd::Replicate { holder, keys }) => { + let response = Response::Cmd( + sn_protocol::messages::CmdResponse::Replicate(Ok(())), + ); + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, response) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + + self.add_keys_to_replication_fetcher(holder, keys); + } + Request::Cmd(sn_protocol::messages::Cmd::QuoteVerification { + quotes, + .. + }) => { + let response = Response::Cmd( + sn_protocol::messages::CmdResponse::QuoteVerification(Ok(())), + ); + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, response) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + + // The keypair is required to verify the quotes, + // hence throw it up to Network layer for further actions. + let quotes = quotes + .iter() + .filter_map(|(peer_address, quote)| { + peer_address + .as_peer_id() + .map(|peer_id| (peer_id, quote.clone())) + }) + .collect(); + self.send_event(NetworkEvent::QuoteVerification { quotes }) + } + Request::Cmd(sn_protocol::messages::Cmd::PeerConsideredAsBad { + detected_by, + bad_peer, + bad_behaviour, + }) => { + let response = Response::Cmd( + sn_protocol::messages::CmdResponse::PeerConsideredAsBad(Ok(())), + ); + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, response) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + + if bad_peer == NetworkAddress::from_peer(self.self_peer_id) { + warn!("Peer {detected_by:?} consider us as BAD, due to {bad_behaviour:?}."); + // TODO: shall we terminate self after received such notifications + // from the majority close_group nodes around us? + } else { + error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us."); + } + } + Request::Query(query) => { + self.send_event(NetworkEvent::QueryRequestReceived { + query, + channel: MsgResponder::FromPeer(channel), + }) + } + } + } + Message::Response { + request_id, + response, + } => { + trace!("Got response {request_id:?} from peer {peer:?}, res: {response}."); + if let Some(sender) = self.pending_requests.remove(&request_id) { + // The sender will be provided if the caller (Requester) is awaiting for a response + // at the call site. + // Else the Request was just sent to the peer and the Response was + // meant to be handled in another way and is not awaited. + match sender { + Some(sender) => sender + .send(Ok(response)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?, + None => { + if let Response::Cmd(CmdResponse::Replicate(Ok(()))) = response { + // Nothing to do, response was fine + // This only exists to ensure we dont drop the handle and + // exit early, potentially logging false connection woes + } else { + // responses that are not awaited at the call site must be handled + // separately + self.send_event(NetworkEvent::ResponseReceived { + res: response, + }); + } + } + } + } else { + warn!("Tried to remove a RequestId from pending_requests which was not inserted in the first place. + Use Cmd::SendRequest with sender:None if you want the Response to be fed into the common handle_response function"); + } + } + }, + request_response::Event::OutboundFailure { + request_id, + error, + peer, + } => { + if let Some(sender) = self.pending_requests.remove(&request_id) { + match sender { + Some(sender) => { + sender + .send(Err(error.into())) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + None => { + warn!("RequestResponse: OutboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); + return Err(NetworkError::ReceivedResponseDropped(request_id)); + } + } + } else { + warn!("RequestResponse: OutboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); + return Err(NetworkError::ReceivedResponseDropped(request_id)); + } + } + request_response::Event::InboundFailure { + peer, + request_id, + error, + } => { + warn!("RequestResponse: InboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}"); + } + request_response::Event::ResponseSent { peer, request_id } => { + trace!("ResponseSent for request_id: {request_id:?} and peer: {peer:?}"); + } + } + Ok(()) + } + + fn add_keys_to_replication_fetcher( + &mut self, + sender: NetworkAddress, + incoming_keys: Vec<(NetworkAddress, RecordType)>, + ) { + let holder = if let Some(peer_id) = sender.as_peer_id() { + peer_id + } else { + warn!("Replication list sender is not a peer_id {sender:?}"); + return; + }; + + trace!( + "Received replication list from {holder:?} of {} keys", + incoming_keys.len() + ); + + // accept replication requests from the K_VALUE peers away, + // giving us some margin for replication + let closest_k_peers = self.get_closest_k_value_local_peers(); + if !closest_k_peers.contains(&holder) || holder == self.self_peer_id { + trace!("Holder {holder:?} is self or not in replication range."); + return; + } + + // On receive a replication_list from a close_group peer, we undertake two tasks: + // 1, For those keys that we don't have: + // fetch them if close enough to us + // 2, For those keys that we have and supposed to be held by the sender as well: + // start chunk_proof check against a randomly selected chunk type record to the sender + + // For fetching, only handle those non-exist and in close range keys + let keys_to_store = + self.select_non_existent_records_for_replications(&incoming_keys, &closest_k_peers); + + if keys_to_store.is_empty() { + debug!("Empty keys to store after adding to"); + } else { + #[allow(clippy::mutable_key_type)] + let all_keys = self + .swarm + .behaviour_mut() + .kademlia + .store_mut() + .record_addresses_ref(); + let keys_to_fetch = self + .replication_fetcher + .add_keys(holder, keys_to_store, all_keys); + if keys_to_fetch.is_empty() { + trace!("no waiting keys to fetch from the network"); + } else { + self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); + } + } + + // Only trigger chunk_proof check when received a periodical replication request. + if incoming_keys.len() > 1 { + let keys_to_verify = self.select_verification_data_candidates(sender); + + if keys_to_verify.is_empty() { + debug!("No valid candidate to be checked against peer {holder:?}"); + } else { + self.send_event(NetworkEvent::ChunkProofVerification { + peer_id: holder, + keys_to_verify, + }); + } + } + } + + /// Checks suggested records against what we hold, so we only + /// enqueue what we do not have + fn select_non_existent_records_for_replications( + &mut self, + incoming_keys: &[(NetworkAddress, RecordType)], + closest_k_peers: &Vec, + ) -> Vec<(NetworkAddress, RecordType)> { + #[allow(clippy::mutable_key_type)] + let locally_stored_keys = self + .swarm + .behaviour_mut() + .kademlia + .store_mut() + .record_addresses_ref(); + let non_existent_keys: Vec<_> = incoming_keys + .iter() + .filter(|(addr, record_type)| { + let key = addr.to_record_key(); + let local = locally_stored_keys.get(&key); + + // if we have a local value of matching record_type, we don't need to fetch it + if let Some((_, local_record_type)) = local { + let not_same_type = local_record_type != record_type; + if not_same_type { + // Shall only happens for Register + info!("Record {addr:?} has different type: local {local_record_type:?}, incoming {record_type:?}"); + } + not_same_type + } else { + true + } + }) + .collect(); + + non_existent_keys + .into_iter() + .filter_map(|(key, record_type)| { + if Self::is_in_close_range(&self.self_peer_id, key, closest_k_peers) { + Some((key.clone(), record_type.clone())) + } else { + // Reduce the log level as there will always be around 40% records being + // out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2` + // to send our replication list to provide addressing margin. + // Given there will normally be 6 nodes sending such list with interval of 5-10s, + // this will accumulate to a lot of logs with the increasing records uploaded. + trace!("not in close range for key {key:?}"); + None + } + }) + .collect() + } + + /// A close target doesn't falls into the close peers range: + /// For example, a node b11111X has an RT: [(1, b1111), (2, b111), (5, b11), (9, b1), (7, b0)] + /// Then for a target bearing b011111 as prefix, all nodes in (7, b0) are its close_group peers. + /// Then the node b11111X. But b11111X's close_group peers [(1, b1111), (2, b111), (5, b11)] + /// are none among target b011111's close range. + /// Hence, the ilog2 calculation based on close_range cannot cover such case. + /// And have to sort all nodes to figure out whether self is among the close_group to the target. + fn is_in_close_range( + our_peer_id: &PeerId, + target: &NetworkAddress, + all_peers: &Vec, + ) -> bool { + if all_peers.len() <= REPLICATION_PEERS_COUNT { + return true; + } + + // Margin of 2 to allow our RT being bit lagging. + match sort_peers_by_address(all_peers, target, REPLICATION_PEERS_COUNT) { + Ok(close_group) => close_group.contains(&our_peer_id), + Err(err) => { + warn!("Could not get sorted peers for {target:?} with error {err:?}"); + true + } + } + } + + /// Check among all chunk type records that we have, select those close to the peer, + /// and randomly pick one as the verification candidate. + #[allow(clippy::mutable_key_type)] + fn select_verification_data_candidates(&mut self, peer: NetworkAddress) -> Vec { + let mut closest_peers = self + .swarm + .behaviour_mut() + .kademlia + .get_closest_local_peers(&self.self_peer_id.into()) + .map(|peer| peer.into_preimage()) + .take(20) + .collect_vec(); + closest_peers.push(self.self_peer_id); + + let target_peer = if let Some(peer_id) = peer.as_peer_id() { + peer_id + } else { + error!("Target {peer:?} is not a valid PeerId"); + return vec![]; + }; + + #[allow(clippy::mutable_key_type)] + let all_keys = self + .swarm + .behaviour_mut() + .kademlia + .store_mut() + .record_addresses_ref(); + + // Targeted chunk type record shall be expected within the close range from our perspective. + let mut verify_candidates: Vec = all_keys + .values() + .filter_map(|(addr, record_type)| { + if RecordType::Chunk == *record_type { + match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { + Ok(close_group) => { + if close_group.contains(&&target_peer) { + Some(addr.clone()) + } else { + None + } + } + Err(err) => { + warn!("Could not get sorted peers for {addr:?} with error {err:?}"); + None + } + } + } else { + None + } + }) + .collect(); + + verify_candidates.sort_by_key(|a| peer.distance(a)); + + // To ensure the candidate mush have to be held by the peer, + // we only carry out check when there are already certain amount of chunks uploaded + // AND choose candidate from certain reduced range. + if verify_candidates.len() > 50 { + let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); + vec![verify_candidates[index].clone()] + } else { + vec![] + } + } +} diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs new file mode 100644 index 0000000000..70b6681271 --- /dev/null +++ b/sn_networking/src/event/swarm.rs @@ -0,0 +1,644 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{ + cmd::SwarmCmd, event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, + target_arch::Instant, NetworkEvent, Result, SwarmDriver, +}; +use itertools::Itertools; +#[cfg(feature = "local-discovery")] +use libp2p::mdns; +use libp2p::{ + kad::K_VALUE, + multiaddr::Protocol, + swarm::{ + dial_opts::{DialOpts, PeerCondition}, + DialError, SwarmEvent, + }, + Multiaddr, PeerId, TransportError, +}; +use sn_protocol::{ + get_port_from_multiaddr, + version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}, +}; +use std::collections::HashSet; +use tokio::time::Duration; + +impl SwarmDriver { + /// Handle `SwarmEvents` + pub(crate) fn handle_swarm_events(&mut self, event: SwarmEvent) -> Result<()> { + let start = Instant::now(); + let event_string; + match event { + SwarmEvent::Behaviour(NodeEvent::MsgReceived(event)) => { + event_string = "msg_received"; + if let Err(e) = self.handle_req_resp_events(event) { + warn!("MsgReceivedError: {e:?}"); + } + } + SwarmEvent::Behaviour(NodeEvent::Kademlia(kad_event)) => { + event_string = "kad_event"; + self.handle_kad_event(kad_event)?; + } + SwarmEvent::Behaviour(NodeEvent::Dcutr(event)) => { + event_string = "dcutr_event"; + info!( + "Dcutr with remote peer: {:?} is: {:?}", + event.remote_peer_id, event.result + ); + } + SwarmEvent::Behaviour(NodeEvent::RelayClient(event)) => { + event_string = "relay_client_event"; + + info!(?event, "relay client event"); + + if let libp2p::relay::client::Event::ReservationReqAccepted { + relay_peer_id, .. + } = *event + { + self.relay_manager + .on_successful_reservation_by_client(&relay_peer_id, &mut self.swarm); + } + } + + SwarmEvent::Behaviour(NodeEvent::RelayServer(event)) => { + event_string = "relay_server_event"; + + info!(?event, "relay server event"); + + match *event { + libp2p::relay::Event::ReservationReqAccepted { + src_peer_id, + renewed: _, + } => { + self.relay_manager + .on_successful_reservation_by_server(src_peer_id); + } + libp2p::relay::Event::ReservationTimedOut { src_peer_id } => { + self.relay_manager.on_reservation_timeout(src_peer_id); + } + _ => {} + } + } + SwarmEvent::Behaviour(NodeEvent::Identify(iden)) => { + event_string = "identify"; + + match *iden { + libp2p::identify::Event::Received { peer_id, info } => { + trace!(%peer_id, ?info, "identify: received info"); + + if info.protocol_version != IDENTIFY_PROTOCOL_STR.to_string() { + warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {:?}", IDENTIFY_PROTOCOL_STR.as_str()); + + self.send_event(NetworkEvent::PeerWithUnsupportedProtocol { + our_protocol: IDENTIFY_PROTOCOL_STR.to_string(), + their_protocol: info.protocol_version, + }); + + return Ok(()); + } + + // if client, return. + if info.agent_version != IDENTIFY_NODE_VERSION_STR.to_string() { + return Ok(()); + } + + let has_dialed = self.dialed_peers.contains(&peer_id); + + // If we're not in local mode, only add globally reachable addresses. + // Strip the `/p2p/...` part of the multiaddresses. + // Collect into a HashSet directly to avoid multiple allocations and handle deduplication. + let addrs: HashSet = match self.local { + true => info + .listen_addrs + .into_iter() + .map(|addr| multiaddr_strip_p2p(&addr)) + .collect(), + false => info + .listen_addrs + .into_iter() + .filter(multiaddr_is_global) + .map(|addr| multiaddr_strip_p2p(&addr)) + .collect(), + }; + + self.relay_manager.add_potential_candidates( + &peer_id, + &addrs, + &info.protocols, + ); + + // When received an identify from un-dialed peer, try to dial it + // The dial shall trigger the same identify to be sent again and confirm + // peer is external accessible, hence safe to be added into RT. + if !self.local && !has_dialed { + // Only need to dial back for not fulfilled kbucket + let (kbucket_full, already_present_in_rt, ilog2) = + if let Some(kbucket) = + self.swarm.behaviour_mut().kademlia.kbucket(peer_id) + { + let ilog2 = kbucket.range().0.ilog2(); + let num_peers = kbucket.num_entries(); + let mut is_bucket_full = num_peers >= K_VALUE.into(); + + // check if peer_id is already a part of RT + let already_present_in_rt = kbucket + .iter() + .any(|entry| entry.node.key.preimage() == &peer_id); + + // If the bucket contains any of a bootstrap node, + // consider the bucket is not full and dial back + // so that the bootstrap nodes can be replaced. + if is_bucket_full { + if let Some(peers) = self.bootstrap_peers.get(&ilog2) { + if kbucket.iter().any(|entry| { + peers.contains(entry.node.key.preimage()) + }) { + is_bucket_full = false; + } + } + } + + (is_bucket_full, already_present_in_rt, ilog2) + } else { + return Ok(()); + }; + + if kbucket_full { + trace!("received identify for a full bucket {ilog2:?}, not dialing {peer_id:?} on {addrs:?}"); + return Ok(()); + } else if already_present_in_rt { + trace!("received identify for {peer_id:?} that is already part of the RT. Not dialing {peer_id:?} on {addrs:?}"); + return Ok(()); + } + + info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible"); + if let Err(err) = self.swarm.dial( + DialOpts::peer_id(peer_id) + .condition(PeerCondition::NotDialing) + .addresses(addrs.iter().cloned().collect()) + .build(), + ) { + warn!(%peer_id, ?addrs, "dialing error: {err:?}"); + } + + trace!( + "SwarmEvent handled in {:?}: {event_string:?}", + start.elapsed() + ); + return Ok(()); + } + + // If we are not local, we care only for peers that we dialed and thus are reachable. + if self.local || has_dialed { + // To reduce the bad_node check resource usage, + // during the connection establish process, only check cached black_list + // The periodical check, which involves network queries shall filter + // out bad_nodes eventually. + if let Some((_issues, true)) = self.bad_nodes.get(&peer_id) { + info!("Peer {peer_id:?} is considered as bad, blocking it."); + } else { + self.remove_bootstrap_from_full(peer_id); + + trace!(%peer_id, ?addrs, "identify: attempting to add addresses to routing table"); + + // Attempt to add the addresses to the routing table. + for multiaddr in addrs { + let _routing_update = self + .swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, multiaddr); + } + } + } + trace!( + "SwarmEvent handled in {:?}: {event_string:?}", + start.elapsed() + ); + } + // Log the other Identify events. + libp2p::identify::Event::Sent { .. } => trace!("identify: {iden:?}"), + libp2p::identify::Event::Pushed { .. } => trace!("identify: {iden:?}"), + libp2p::identify::Event::Error { .. } => trace!("identify: {iden:?}"), + } + } + #[cfg(feature = "local-discovery")] + SwarmEvent::Behaviour(NodeEvent::Mdns(mdns_event)) => { + event_string = "mdns"; + match *mdns_event { + mdns::Event::Discovered(list) => { + if self.local { + for (peer_id, addr) in list { + // The multiaddr does not contain the peer ID, so add it. + let addr = addr.with(Protocol::P2p(peer_id)); + + info!(%addr, "mDNS node discovered and dialing"); + + if let Err(err) = self.dial(addr.clone()) { + warn!(%addr, "mDNS node dial error: {err:?}"); + } + } + } + } + mdns::Event::Expired(peer) => { + trace!("mdns peer {peer:?} expired"); + } + } + } + + SwarmEvent::NewListenAddr { + address, + listener_id, + } => { + event_string = "new listen addr"; + + // update our stored port if it is configured to be 0 or None + match self.listen_port { + Some(0) | None => { + if let Some(actual_port) = get_port_from_multiaddr(&address) { + info!("Our listen port is configured as 0 or is not set. Setting it to our actual port: {actual_port}"); + self.listen_port = Some(actual_port); + } + } + _ => {} + }; + + let local_peer_id = *self.swarm.local_peer_id(); + let address = address.with(Protocol::P2p(local_peer_id)); + + // Trigger server mode if we're not a client and we should not add our own address if we're behind + // home network. + if !self.is_client && !self.is_behind_home_network { + if self.local { + // all addresses are effectively external here... + // this is needed for Kad Mode::Server + self.swarm.add_external_address(address.clone()); + } else { + // only add our global addresses + if multiaddr_is_global(&address) { + self.swarm.add_external_address(address.clone()); + } + } + } + + self.send_event(NetworkEvent::NewListenAddr(address.clone())); + + info!("Local node is listening {listener_id:?} on {address:?}"); + } + SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + } => { + event_string = "listener closed"; + info!("Listener {listener_id:?} with add {addresses:?} has been closed for {reason:?}"); + self.relay_manager + .on_listener_closed(&listener_id, &mut self.swarm); + } + SwarmEvent::IncomingConnection { + connection_id, + local_addr, + send_back_addr, + } => { + event_string = "incoming"; + trace!("IncomingConnection ({connection_id:?}) with local_addr: {local_addr:?} send_back_addr: {send_back_addr:?}"); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + connection_id, + concurrent_dial_errors, + established_in, + } => { + event_string = "ConnectionEstablished"; + trace!(%peer_id, num_established, ?concurrent_dial_errors, "ConnectionEstablished ({connection_id:?}) in {established_in:?}: {}", endpoint_str(&endpoint)); + + let _ = self.live_connected_peers.insert( + connection_id, + (peer_id, Instant::now() + Duration::from_secs(60)), + ); + + if endpoint.is_dialer() { + self.dialed_peers.push(peer_id); + } + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + cause, + num_established, + connection_id, + } => { + event_string = "ConnectionClosed"; + trace!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint)); + let _ = self.live_connected_peers.remove(&connection_id); + } + SwarmEvent::OutgoingConnectionError { + connection_id, + peer_id: None, + error, + } => { + event_string = "OutgoingConnErr"; + warn!("OutgoingConnectionError to on {connection_id:?} - {error:?}"); + } + SwarmEvent::OutgoingConnectionError { + peer_id: Some(failed_peer_id), + error, + connection_id, + } => { + event_string = "OutgoingConnErr"; + warn!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}"); + + // we need to decide if this was a critical error and the peer should be removed from the routing table + let should_clean_peer = match error { + DialError::Transport(errors) => { + // as it's an outgoing error, if it's transport based we can assume it is _our_ fault + // + // (eg, could not get a port for a tcp connection) + // so we default to it not being a real issue + // unless there are _specific_ errors (connection refused eg) + error!("Dial errors len : {:?}", errors.len()); + let mut there_is_a_serious_issue = false; + for (_addr, err) in errors { + error!("OutgoingTransport error : {err:?}"); + + match err { + TransportError::MultiaddrNotSupported(addr) => { + warn!("Multiaddr not supported : {addr:?}"); + // if we can't dial a peer on a given address, we should remove it from the routing table + there_is_a_serious_issue = true + } + TransportError::Other(err) => { + let problematic_errors = [ + "ConnectionRefused", + "HostUnreachable", + "HandshakeTimedOut", + ]; + + let is_bootstrap_peer = self + .bootstrap_peers + .iter() + .any(|(_ilog2, peers)| peers.contains(&failed_peer_id)); + + if is_bootstrap_peer + && self.connected_peers < self.bootstrap_peers.len() + { + warn!("OutgoingConnectionError: On bootstrap peer {failed_peer_id:?}, while still in bootstrap mode, ignoring"); + there_is_a_serious_issue = false; + } else { + // It is really difficult to match this error, due to being eg: + // Custom { kind: Other, error: Left(Left(Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })) } + // if we can match that, let's. But meanwhile we'll check the message + let error_msg = format!("{err:?}"); + if problematic_errors + .iter() + .any(|err| error_msg.contains(err)) + { + warn!("Problematic error encountered: {error_msg}"); + there_is_a_serious_issue = true; + } + } + } + } + } + there_is_a_serious_issue + } + DialError::NoAddresses => { + // We provided no address, and while we can't really blame the peer + // we also can't connect, so we opt to cleanup... + warn!("OutgoingConnectionError: No address provided"); + true + } + DialError::Aborted => { + // not their fault + warn!("OutgoingConnectionError: Aborted"); + false + } + DialError::DialPeerConditionFalse(_) => { + // we could not dial due to an internal condition, so not their issue + warn!("OutgoingConnectionError: DialPeerConditionFalse"); + false + } + DialError::LocalPeerId { endpoint, .. } => { + // This is actually _us_ So we should remove this from the RT + error!( + "OutgoingConnectionError: LocalPeerId: {}", + endpoint_str(&endpoint) + ); + true + } + DialError::WrongPeerId { obtained, endpoint } => { + // The peer id we attempted to dial was not the one we expected + // cleanup + error!("OutgoingConnectionError: WrongPeerId: obtained: {obtained:?}, endpoint: {endpoint:?}"); + true + } + DialError::Denied { cause } => { + // The peer denied our connection + // cleanup + error!("OutgoingConnectionError: Denied: {cause:?}"); + true + } + }; + + if should_clean_peer { + warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now"); + + if let Some(dead_peer) = self + .swarm + .behaviour_mut() + .kademlia + .remove_peer(&failed_peer_id) + { + self.connected_peers = self.connected_peers.saturating_sub(1); + + self.handle_cmd(SwarmCmd::RecordNodeIssue { + peer_id: failed_peer_id, + issue: crate::NodeIssue::ConnectionIssue, + })?; + + self.send_event(NetworkEvent::PeerRemoved( + *dead_peer.node.key.preimage(), + self.connected_peers, + )); + + self.log_kbuckets(&failed_peer_id); + let _ = self.check_for_change_in_our_close_group(); + } + } + } + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + } => { + event_string = "Incoming ConnErr"; + error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); + } + SwarmEvent::Dialing { + peer_id, + connection_id, + } => { + event_string = "Dialing"; + trace!("Dialing {peer_id:?} on {connection_id:?}"); + } + SwarmEvent::NewExternalAddrCandidate { address } => { + event_string = "NewExternalAddrCandidate"; + + if !self.swarm.external_addresses().any(|addr| addr == &address) + && !self.is_client + // If we are behind a home network, then our IP is returned here. We should be only having + // relay server as our external address + // todo: can our relay address be reported here? If so, maybe we should add them. + && !self.is_behind_home_network + { + debug!(%address, "external address: new candidate"); + + // Identify will let us know when we have a candidate. (Peers will tell us what address they see us as.) + // We manually confirm this to be our externally reachable address, though in theory it's possible we + // are not actually reachable. This event returns addresses with ports that were not set by the user, + // so we must not add those ports as they will not be forwarded. + // Setting this will also switch kad to server mode if it's not already in it. + if let Some(our_port) = self.listen_port { + if let Some(port) = get_port_from_multiaddr(&address) { + if port == our_port { + info!(%address, "external address: new candidate has the same configured port, adding it."); + self.swarm.add_external_address(address); + } else { + info!(%address, %our_port, "external address: new candidate has a different port, not adding it."); + } + } + } else { + trace!("external address: listen port not set. This has to be set if you're running a node"); + } + } + let all_external_addresses = self.swarm.external_addresses().collect_vec(); + let all_listeners = self.swarm.listeners().collect_vec(); + debug!("All our listeners: {all_listeners:?}"); + debug!("All our external addresses: {all_external_addresses:?}"); + } + SwarmEvent::ExternalAddrConfirmed { address } => { + event_string = "ExternalAddrConfirmed"; + info!(%address, "external address: confirmed"); + } + SwarmEvent::ExternalAddrExpired { address } => { + event_string = "ExternalAddrExpired"; + info!(%address, "external address: expired"); + } + other => { + event_string = "Other"; + + trace!("SwarmEvent has been ignored: {other:?}") + } + } + self.remove_outdated_connections(); + + self.log_handling(event_string.to_string(), start.elapsed()); + + trace!( + "SwarmEvent handled in {:?}: {event_string:?}", + start.elapsed() + ); + Ok(()) + } + + // if target bucket is full, remove a bootstrap node if presents. + fn remove_bootstrap_from_full(&mut self, peer_id: PeerId) { + let mut shall_removed = None; + + if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) { + if kbucket.num_entries() >= K_VALUE.into() { + if let Some(peers) = self.bootstrap_peers.get(&kbucket.range().0.ilog2()) { + for peer_entry in kbucket.iter() { + if peers.contains(peer_entry.node.key.preimage()) { + shall_removed = Some(*peer_entry.node.key.preimage()); + break; + } + } + } + } + } + if let Some(to_be_removed_bootstrap) = shall_removed { + trace!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}"); + let _entry = self + .swarm + .behaviour_mut() + .kademlia + .remove_peer(&to_be_removed_bootstrap); + } + } + + // Remove outdated connection to a peer if it is not in the RT. + fn remove_outdated_connections(&mut self) { + let mut shall_removed = vec![]; + + let timed_out_connections = + self.live_connected_peers + .iter() + .filter_map(|(connection_id, (peer_id, timeout))| { + if Instant::now() > *timeout { + Some((connection_id, peer_id)) + } else { + None + } + }); + + for (connection_id, peer_id) in timed_out_connections { + // Skip if the peer is present in our RT + if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(*peer_id) { + if kbucket + .iter() + .any(|peer_entry| *peer_id == *peer_entry.node.key.preimage()) + { + continue; + } + } + + // skip if the peer is a relay server that we're connected to + if self.relay_manager.keep_alive_peer(peer_id) { + continue; + } + + shall_removed.push((*connection_id, *peer_id)); + } + + if !shall_removed.is_empty() { + trace!( + "Current libp2p peers pool stats is {:?}", + self.swarm.network_info() + ); + trace!( + "Removing {} outdated live connections, still have {} left.", + shall_removed.len(), + self.live_connected_peers.len() + ); + trace!(?self.relay_manager); + + for (connection_id, peer_id) in shall_removed { + let _ = self.live_connected_peers.remove(&connection_id); + let result = self.swarm.close_connection(connection_id); + trace!("Removed outdated connection {connection_id:?} to {peer_id:?} with result: {result:?}"); + } + } + } +} + +/// Helper function to print formatted connection role info. +fn endpoint_str(endpoint: &libp2p::core::ConnectedPoint) -> String { + match endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => { + format!("outgoing ({address})") + } + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + format!("incoming ({send_back_addr})") + } + } +} diff --git a/sn_networking/src/get_record_handler.rs b/sn_networking/src/get_record_handler.rs deleted file mode 100644 index 574ad32293..0000000000 --- a/sn_networking/src/get_record_handler.rs +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use crate::{ - get_quorum_value, GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, - CLOSE_GROUP_SIZE, -}; -use libp2p::{ - kad::{self, PeerRecord, ProgressStep, QueryId, QueryStats, Record}, - PeerId, -}; -use sn_protocol::PrettyPrintRecordKey; -use std::collections::{hash_map::Entry, HashMap, HashSet}; -use tokio::sync::oneshot; -use xor_name::XorName; - -/// Using XorName to differentiate different record content under the same key. -type GetRecordResultMap = HashMap)>; -pub(crate) type PendingGetRecord = HashMap< - QueryId, - ( - oneshot::Sender>, - GetRecordResultMap, - GetRecordCfg, - ), ->; - -// For `get_record` returning behaviour: -// 1, targeting a non-existing entry -// there will only be one event of `kad::Event::OutboundQueryProgressed` -// with `ProgressStep::last` to be `true` -// `QueryStats::requests` to be 20 (K-Value) -// `QueryStats::success` to be over majority of the requests -// `err::NotFound::closest_peers` contains a list of CLOSE_GROUP_SIZE peers -// 2, targeting an existing entry -// there will a sequence of (at least CLOSE_GROUP_SIZE) events of -// `kad::Event::OutboundQueryProgressed` to be received -// with `QueryStats::end` always being `None` -// `ProgressStep::last` all to be `false` -// `ProgressStep::count` to be increased with step of 1 -// capped and stopped at CLOSE_GROUP_SIZE, may have duplicated counts -// `PeerRecord::peer` could be None to indicate from self -// in which case it always use a duplicated `ProgressStep::count` -// the sequence will be completed with `FinishedWithNoAdditionalRecord` -// where: `cache_candidates`: being the peers supposed to hold the record but not -// `ProgressStep::count`: to be `number of received copies plus one` -// `ProgressStep::last` to be `true` -impl SwarmDriver { - // Accumulates the GetRecord query results - // If we get enough responses (quorum) for a record with the same content hash: - // - we return the Record after comparing with the target record. This might return RecordDoesNotMatch if the - // check fails. - // - if multiple content hashes are found, we return a SplitRecord Error - // And then we stop the kad query as we are done here. - pub(crate) fn accumulate_get_record_found( - &mut self, - query_id: QueryId, - peer_record: PeerRecord, - _stats: QueryStats, - step: ProgressStep, - ) -> Result<()> { - let peer_id = if let Some(peer_id) = peer_record.peer { - peer_id - } else { - self.self_peer_id - }; - let pretty_key = PrettyPrintRecordKey::from(&peer_record.record.key).into_owned(); - - if let Entry::Occupied(mut entry) = self.pending_get_record.entry(query_id) { - let (_sender, result_map, cfg) = entry.get_mut(); - - if !cfg.expected_holders.is_empty() { - if cfg.expected_holders.remove(&peer_id) { - debug!("For record {pretty_key:?} task {query_id:?}, received a copy from an expected holder {peer_id:?}"); - } else { - debug!("For record {pretty_key:?} task {query_id:?}, received a copy from an unexpected holder {peer_id:?}"); - } - } - - // Insert the record and the peer into the result_map. - let record_content_hash = XorName::from_content(&peer_record.record.value); - let responded_peers = - if let Entry::Occupied(mut entry) = result_map.entry(record_content_hash) { - let (_, peer_list) = entry.get_mut(); - let _ = peer_list.insert(peer_id); - peer_list.len() - } else { - let mut peer_list = HashSet::new(); - let _ = peer_list.insert(peer_id); - result_map.insert(record_content_hash, (peer_record.record.clone(), peer_list)); - 1 - }; - - let expected_answers = get_quorum_value(&cfg.get_quorum); - - trace!("Expecting {expected_answers:?} answers for record {pretty_key:?} task {query_id:?}, received {responded_peers} so far"); - - if responded_peers >= expected_answers { - if !cfg.expected_holders.is_empty() { - debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with non-responded expected holders {:?}", cfg.expected_holders); - } - let cfg = cfg.clone(); - - // Remove the query task and consume the variables. - let (sender, result_map, _) = entry.remove(); - - if result_map.len() == 1 { - Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?; - } else { - debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record"); - sender - .send(Err(GetRecordError::SplitRecord { result_map })) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - - // Stop the query; possibly stops more nodes from being queried. - if let Some(mut query) = self.swarm.behaviour_mut().kademlia.query_mut(&query_id) { - query.finish(); - } - } else if usize::from(step.count) >= CLOSE_GROUP_SIZE { - debug!("For record {pretty_key:?} task {query_id:?}, got {:?} with {} versions so far.", - step.count, result_map.len()); - } - } else { - // return error if the entry cannot be found - return Err(NetworkError::ReceivedKademliaEventDropped { - query_id, - event: format!("Accumulate Get Record of {pretty_key:?}"), - }); - } - Ok(()) - } - - // Handles the possible cases when a GetRecord Query completes. - // The accumulate_get_record_found returns the record if the quorum is satisfied, but, if we have reached this point - // then we did not get enough records or we got split records (which prevented the quorum to pass). - // Returns the following errors: - // RecordNotFound if the result_map is empty. - // NotEnoughCopies if there is only a single content hash version. - // SplitRecord if there are multiple content hash versions. - pub(crate) fn handle_get_record_finished( - &mut self, - query_id: QueryId, - step: ProgressStep, - ) -> Result<()> { - // return error if the entry cannot be found - if let Some((sender, result_map, cfg)) = self.pending_get_record.remove(&query_id) { - let num_of_versions = result_map.len(); - let (result, log_string) = if let Some((record, from_peers)) = - result_map.values().next() - { - let result = if num_of_versions == 1 { - Err(GetRecordError::NotEnoughCopies { - record: record.clone(), - expected: get_quorum_value(&cfg.get_quorum), - got: from_peers.len(), - }) - } else { - Err(GetRecordError::SplitRecord { - result_map: result_map.clone(), - }) - }; - - ( - result, - format!("Getting record {:?} completed with only {:?} copies received, and {num_of_versions} versions.", - PrettyPrintRecordKey::from(&record.key), usize::from(step.count) - 1) - ) - } else { - ( - Err(GetRecordError::RecordNotFound), - format!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count), - ) - }; - - if cfg.expected_holders.is_empty() { - debug!("{log_string}"); - } else { - debug!( - "{log_string}, and {:?} expected holders not responded", - cfg.expected_holders - ); - } - - sender - .send(result) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } else { - // We manually perform `query.finish()` if we return early from accumulate fn. - // Thus we will still get FinishedWithNoAdditionalRecord. - trace!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender."); - } - Ok(()) - } - - /// Handles the possible cases when a kad GetRecord returns an error. - /// If we get NotFound/QuorumFailed, we return a RecordNotFound error. Kad currently does not enforce any quorum. - /// If we get a Timeout: - /// - return a QueryTimeout if we get a split record (?) if we have multiple content hashes. - /// - if the quorum is satisfied, we return the record after comparing it with the target record. This might return - /// RecordDoesNotMatch if the check fails. - /// - else we return q QueryTimeout error. - pub(crate) fn handle_get_record_error( - &mut self, - query_id: QueryId, - get_record_err: kad::GetRecordError, - _stats: QueryStats, - _step: ProgressStep, - ) -> Result<()> { - match &get_record_err { - kad::GetRecordError::NotFound { .. } | kad::GetRecordError::QuorumFailed { .. } => { - // return error if the entry cannot be found - let (sender, _, cfg) = - self.pending_get_record.remove(&query_id).ok_or_else(|| { - trace!("Can't locate query task {query_id:?}, it has likely been completed already."); - NetworkError::ReceivedKademliaEventDropped { - query_id, - event: "GetRecordError NotFound or QuorumFailed".to_string(), - } - })?; - - if cfg.expected_holders.is_empty() { - info!("Get record task {query_id:?} failed with error {get_record_err:?}"); - } else { - debug!("Get record task {query_id:?} failed with {:?} expected holders not responded, error {get_record_err:?}", cfg.expected_holders); - } - sender - .send(Err(GetRecordError::RecordNotFound)) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - kad::GetRecordError::Timeout { key } => { - // return error if the entry cannot be found - let pretty_key = PrettyPrintRecordKey::from(key); - let (sender, result_map, cfg) = - self.pending_get_record.remove(&query_id).ok_or_else(|| { - trace!( - "Can't locate query task {query_id:?} for {pretty_key:?}, it has likely been completed already." - ); - NetworkError::ReceivedKademliaEventDropped { - query_id, - event: format!("GetRecordError Timeout {pretty_key:?}"), - } - })?; - - let required_response_count = get_quorum_value(&cfg.get_quorum); - - // if we've a split over the result xorname, then we don't attempt to resolve this here. - // Retry and resolve through normal flows without a timeout. - // todo: is the above still the case? Why don't we return a split record error. - if result_map.len() > 1 { - warn!( - "Get record task {query_id:?} for {pretty_key:?} timed out with split result map" - ); - sender - .send(Err(GetRecordError::QueryTimeout)) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - - return Ok(()); - } - - // if we have enough responses here, we can return the record - if let Some((record, peers)) = result_map.values().next() { - if peers.len() >= required_response_count { - Self::send_record_after_checking_target(sender, record.clone(), &cfg)?; - return Ok(()); - } - } - - warn!("Get record task {query_id:?} for {pretty_key:?} returned insufficient responses. {:?} did not return record", cfg.expected_holders); - // Otherwise report the timeout - sender - .send(Err(GetRecordError::QueryTimeout)) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } - - Ok(()) - } - - fn send_record_after_checking_target( - sender: oneshot::Sender>, - record: Record, - cfg: &GetRecordCfg, - ) -> Result<()> { - if cfg.target_record.is_none() || cfg.does_target_match(&record) { - sender - .send(Ok(record)) - .map_err(|_| NetworkError::InternalMsgChannelDropped) - } else { - sender - .send(Err(GetRecordError::RecordDoesNotMatch(record))) - .map_err(|_| NetworkError::InternalMsgChannelDropped) - } - } -} diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index afbf50930d..75d51ab1e2 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -15,7 +15,6 @@ mod cmd; mod driver; mod error; mod event; -mod get_record_handler; mod log_markers; #[cfg(feature = "open-metrics")] mod metrics;