From 17d292b2c2a2700a38b2a68ed69833c8ac81985a Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Mon, 8 Apr 2024 10:51:02 +0900 Subject: [PATCH] feat(networking): shift to use ilog2 bucket distance for close data calcs ilog2 is about magnitude rather than specifics as distance is already being estimated we can use this to easily render a buffer zone of data we replicate vs a close bucket which we deem to be our responsibility --- sn_networking/src/cmd.rs | 9 ++-- sn_networking/src/driver.rs | 16 ++++---- sn_networking/src/event.rs | 6 +-- sn_networking/src/lib.rs | 4 +- sn_networking/src/record_store.rs | 52 ++++++++++++++---------- sn_networking/src/record_store_api.rs | 10 ++--- sn_networking/src/replication_fetcher.rs | 17 ++++---- sn_node/src/replication.rs | 4 +- 8 files changed, 65 insertions(+), 53 deletions(-) diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 355d3d276d..ec7d6bea75 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -10,7 +10,7 @@ use crate::{ driver::{PendingGetClosestType, SwarmDriver}, error::{NetworkError, Result}, multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE, - REPLICATE_RANGE, + REPLICATION_PEERS_COUNT, }; use libp2p::{ kad::{store::RecordStore, Quorum, Record, RecordKey}, @@ -479,9 +479,10 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .get_distance_range() + .get_farthest_replication_distance_bucket() { - self.replication_fetcher.set_distance_range(distance); + self.replication_fetcher + .set_replication_distance_range(distance); } if let Err(err) = result { @@ -818,7 +819,7 @@ impl SwarmDriver { let replicate_targets = closest_k_peers .into_iter() // add some leeway to allow for divergent knowledge - .take(REPLICATE_RANGE) + .take(REPLICATION_PEERS_COUNT) .collect::>(); let all_records: Vec<_> = self diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 978e2eaa95..c50c848568 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -28,7 +28,6 @@ use crate::{ transport, Network, CLOSE_GROUP_SIZE, }; use futures::StreamExt; -use libp2p::kad::KBucketDistance as Distance; #[cfg(feature = "local-discovery")] use libp2p::mdns; use libp2p::{ @@ -609,8 +608,10 @@ impl SwarmDriver { if let Some(distance) = self.get_farthest_data_address_estimate(&closest_k_peers) { // set any new distance to farthest record in the store self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance); + + let replication_distance = self.swarm.behaviour_mut().kademlia.store_mut().get_farthest_replication_distance_bucket().unwrap_or(1); // the distance range within the replication_fetcher shall be in sync as well - self.replication_fetcher.set_distance_range(distance); + self.replication_fetcher.set_replication_distance_range(replication_distance); } } } @@ -622,27 +623,26 @@ impl SwarmDriver { // ---------- Crate helpers ------------------- // -------------------------------------------- - /// Return a far address, close to but probably farther than our responsibilty range. + /// Returns the farthest bucket, close to but probably farther than our responsibilty range. /// This simply uses the closest k peers to estimate the farthest address as - /// `K_VALUE`th peer's address distance. + /// `K_VALUE`th peer's bucket. fn get_farthest_data_address_estimate( &mut self, // Sorted list of closest k peers to our peer id. closest_k_peers: &[PeerId], - ) -> Option { + ) -> Option { // if we don't have enough peers we don't set the distance range yet. let mut farthest_distance = None; let our_address = NetworkAddress::from_peer(self.self_peer_id); - // get K_VALUE/2 peer's address distance + // get K_VALUEth peer's address distance // This is a rough estimate of the farthest address we might be responsible for. // We want this to be higher than actually necessary, so we retain more data // and can be sure to pass bad node checks if let Some(peer) = closest_k_peers.last() { let address = NetworkAddress::from_peer(*peer); - let distance = our_address.distance(&address); - farthest_distance = Some(distance); + farthest_distance = our_address.distance(&address).ilog2(); } farthest_distance diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 45c4d95b7f..2cde322462 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -12,7 +12,7 @@ use crate::{ error::{NetworkError, Result}, multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, target_arch::Instant, - CLOSE_GROUP_SIZE, REPLICATE_RANGE, + CLOSE_GROUP_SIZE, REPLICATION_PEERS_COUNT, }; use core::fmt; use custom_debug::Debug as CustomDebug; @@ -1310,12 +1310,12 @@ impl SwarmDriver { target: &NetworkAddress, all_peers: &Vec, ) -> bool { - if all_peers.len() <= REPLICATE_RANGE { + if all_peers.len() <= REPLICATION_PEERS_COUNT { return true; } // Margin of 2 to allow our RT being bit lagging. - match sort_peers_by_address(all_peers, target, REPLICATE_RANGE) { + match sort_peers_by_address(all_peers, target, REPLICATION_PEERS_COUNT) { Ok(close_group) => close_group.contains(&our_peer_id), Err(err) => { warn!("Could not get sorted peers for {target:?} with error {err:?}"); diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 31a6d4523f..62046b2484 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -82,9 +82,9 @@ pub type PayeeQuote = (PeerId, MainPubkey, PaymentQuote); /// The size has been set to 5 for improved performance. pub const CLOSE_GROUP_SIZE: usize = 5; -/// The range of peers that will be considered as close to a record target, +/// The count of peers that will be considered as close to a record target, /// that a replication of the record shall be sent/accepted to/by the peer. -pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE + 2; +pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2; /// Majority of a given group (i.e. > 1/2). #[inline] diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index c07579d1e7..deb19abc27 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -19,7 +19,7 @@ use libp2p::{ identity::PeerId, kad::{ store::{Error, RecordStore, Result}, - KBucketDistance as Distance, KBucketKey, ProviderRecord, Record, RecordKey as Key, + KBucketKey, ProviderRecord, Record, RecordKey as Key, }, }; #[cfg(feature = "open-metrics")] @@ -58,9 +58,10 @@ pub struct NodeRecordStore { network_event_sender: mpsc::Sender, /// Send cmds to the network layer. Used to interact with self in an async fashion. swarm_cmd_sender: mpsc::Sender, - /// Distance range specify the acceptable range of record entry. + /// ilog2 distance range of responsible records + /// AKA: how many buckets of data do we consider "close" /// None means accept all records. - distance_range: Option, + responsible_distance_range: Option, #[cfg(feature = "open-metrics")] /// Used to report the number of records held by the store to the metrics server. record_count_metric: Option, @@ -195,7 +196,7 @@ impl NodeRecordStore { records, network_event_sender, swarm_cmd_sender, - distance_range: None, + responsible_distance_range: None, #[cfg(feature = "open-metrics")] record_count_metric: None, received_payment_count: 0, @@ -211,9 +212,9 @@ impl NodeRecordStore { self } - /// Returns the current distance range - pub fn get_distance_range(&self) -> Option { - self.distance_range + /// Returns the current distance ilog2 (aka bucket) range of CLOSE_GROUP nodes. + pub fn get_responsible_distance_range(&self) -> Option { + self.responsible_distance_range } // Converts a Key into a Hex string. @@ -438,7 +439,7 @@ impl NodeRecordStore { live_time: self.timestamp.elapsed().as_secs(), }; - if let Some(distance_range) = self.distance_range { + if let Some(distance_range) = self.responsible_distance_range { let relevant_records = self.get_records_within_distance_range(record_keys_as_hashset, distance_range); @@ -474,7 +475,7 @@ impl NodeRecordStore { pub fn get_records_within_distance_range( &self, records: HashSet<&Key>, - distance_range: Distance, + distance_range: u32, ) -> usize { debug!( "Total record count is {:?}. Distance is: {distance_range:?}", @@ -485,7 +486,7 @@ impl NodeRecordStore { .iter() .filter(|key| { let kbucket_key = KBucketKey::new(key.to_vec()); - distance_range >= self.local_key.distance(&kbucket_key) + distance_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0) }) .count(); @@ -494,8 +495,8 @@ impl NodeRecordStore { } /// Setup the distance range. - pub(crate) fn set_distance_range(&mut self, distance_range: Distance) { - self.distance_range = Some(distance_range); + pub(crate) fn set_responsible_distance_range(&mut self, farthest_responsible_bucket: u32) { + self.responsible_distance_range = Some(farthest_responsible_bucket); } } @@ -730,7 +731,7 @@ pub fn calculate_cost_for_records(quoting_metrics: &QuotingMetrics) -> u64 { mod tests { use super::*; - use crate::{close_group_majority, sort_peers_by_key, REPLICATE_RANGE}; + use crate::{close_group_majority, sort_peers_by_key, REPLICATION_PEERS_COUNT}; use bytes::Bytes; use eyre::ContextCompat; use libp2p::{core::multihash::Multihash, kad::RecordKey}; @@ -1076,7 +1077,7 @@ mod tests { #[tokio::test] #[allow(clippy::mutable_key_type)] - async fn get_records_within_distance_range() -> eyre::Result<()> { + async fn get_records_within_bucket_range() -> eyre::Result<()> { let max_records = 50; let temp_dir = std::env::temp_dir(); @@ -1140,16 +1141,21 @@ mod tests { .wrap_err("Could not parse record store key")?, ); // get the distance to this record from our local key - let distance = self_address.distance(&halfway_record_address); + let distance = self_address + .distance(&halfway_record_address) + .ilog2() + .unwrap_or(0); - store.set_distance_range(distance); + // must be plus one bucket from the halfway record + store.set_responsible_distance_range(distance); let record_keys = store.records.keys().collect(); - // check that the number of records returned is correct - assert_eq!( - store.get_records_within_distance_range(record_keys, distance), - stored_records.len() / 2 + // check that the number of records returned is larger than half our records + // (ie, that we cover _at least_ all the records within our distance range) + assert!( + store.get_records_within_distance_range(record_keys, distance) + >= stored_records.len() / 2 ); Ok(()) @@ -1177,7 +1183,11 @@ mod tests { for _ in 0..num_of_chunks_per_itr { let name = xor_name::rand::random(); let address = NetworkAddress::from_chunk_address(ChunkAddress::new(name)); - match sort_peers_by_key(&peers_vec, &address.as_kbucket_key(), REPLICATE_RANGE) { + match sort_peers_by_key( + &peers_vec, + &address.as_kbucket_key(), + REPLICATION_PEERS_COUNT, + ) { Ok(peers_in_replicate_range) => { let peers_in_replicate_range: Vec = peers_in_replicate_range .iter() diff --git a/sn_networking/src/record_store_api.rs b/sn_networking/src/record_store_api.rs index c04ab564b8..08d7074eaa 100644 --- a/sn_networking/src/record_store_api.rs +++ b/sn_networking/src/record_store_api.rs @@ -10,7 +10,7 @@ use crate::record_store::{ClientRecordStore, NodeRecordStore}; use libp2p::kad::{ store::{RecordStore, Result}, - KBucketDistance as Distance, ProviderRecord, Record, RecordKey, + ProviderRecord, Record, RecordKey, }; use sn_protocol::{storage::RecordType, NetworkAddress}; use sn_transfers::{NanoTokens, QuotingMetrics}; @@ -131,22 +131,22 @@ impl UnifiedRecordStore { } } - pub(crate) fn get_distance_range(&self) -> Option { + pub(crate) fn get_farthest_replication_distance_bucket(&self) -> Option { match self { Self::Client(_store) => { warn!("Calling get_distance_range at Client. This should not happen"); None } - Self::Node(store) => store.get_distance_range(), + Self::Node(store) => store.get_responsible_distance_range(), } } - pub(crate) fn set_distance_range(&mut self, distance: Distance) { + pub(crate) fn set_distance_range(&mut self, distance: u32) { match self { Self::Client(_store) => { warn!("Calling set_distance_range at Client. This should not happen"); } - Self::Node(store) => store.set_distance_range(distance), + Self::Node(store) => store.set_responsible_distance_range(distance), } } diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index c3188bc5d1..046b3d693c 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -10,7 +10,7 @@ use crate::target_arch::spawn; use crate::{event::NetworkEvent, target_arch::Instant}; use libp2p::{ - kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, + kad::{RecordKey, K_VALUE}, PeerId, }; use sn_protocol::{storage::RecordType, NetworkAddress, PrettyPrintRecordKey}; @@ -41,8 +41,8 @@ pub(crate) struct ReplicationFetcher { // Avoid fetching same chunk from different nodes AND carry out too many parallel tasks. on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>, event_sender: mpsc::Sender, - // Distance range that the incoming key shall be fetched - distance_range: Option, + /// ilog2 bucket distance range that the incoming key shall be fetched + distance_range: Option, } impl ReplicationFetcher { @@ -58,7 +58,7 @@ impl ReplicationFetcher { } /// Set the distance range. - pub(crate) fn set_distance_range(&mut self, distance_range: Distance) { + pub(crate) fn set_replication_distance_range(&mut self, distance_range: u32) { self.distance_range = Some(distance_range); } @@ -102,7 +102,8 @@ impl ReplicationFetcher { let self_address = NetworkAddress::from_peer(self.self_peer_id); incoming_keys.retain(|(addr, _record_type)| { - let is_in_range = self_address.distance(addr) <= *distance_range; + let is_in_range = + self_address.distance(addr).ilog2().unwrap_or(0) <= *distance_range; if !is_in_range { out_of_range_keys.push(addr.clone()); } @@ -368,8 +369,8 @@ mod tests { // Set distance range let distance_target = NetworkAddress::from_peer(PeerId::random()); - let distance_range = self_address.distance(&distance_target); - replication_fetcher.set_distance_range(distance_range); + let distance_range = self_address.distance(&distance_target).ilog2().unwrap_or(1); + replication_fetcher.set_replication_distance_range(distance_range); let mut incoming_keys = Vec::new(); let mut in_range_keys = 0; @@ -377,7 +378,7 @@ mod tests { let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); - if key.distance(&self_address) <= distance_range { + if key.distance(&self_address).ilog2().unwrap_or(0) <= distance_range { in_range_keys += 1; } diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 8d500e0ee0..4de65952ff 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -11,7 +11,7 @@ use libp2p::{ kad::{Quorum, Record, RecordKey}, PeerId, }; -use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATE_RANGE}; +use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATION_PEERS_COUNT}; use sn_protocol::{ messages::{Cmd, Query, QueryResponse, Request, Response}, storage::RecordType, @@ -150,7 +150,7 @@ impl Node { let sorted_based_on_addr = match sort_peers_by_address( &closest_k_peers, &data_addr, - REPLICATE_RANGE, + REPLICATION_PEERS_COUNT, ) { Ok(result) => result, Err(err) => {