Skip to content

Commit

Permalink
chore: avoid access Distance private field of U256
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Dec 11, 2024
1 parent 111b635 commit 8e4335e
Show file tree
Hide file tree
Showing 20 changed files with 225 additions and 379 deletions.
455 changes: 130 additions & 325 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ant-evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ external-signer = ["evmlib/external-signer"]
test-utils = []

[dependencies]
alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] }
custom_debug = "~0.6.1"
evmlib = { path = "../evmlib", version = "0.1.4" }
hex = "~0.4.3"
lazy_static = "~1.4.0"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "master", features = ["identify", "kad"] }
libp2p = { version = "0.54.1", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
ring = "0.17.8"
rmp-serde = "1.1.1"
Expand Down
15 changes: 12 additions & 3 deletions ant-evm/src/data_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{AttoTokens, EvmError};
use alloy::primitives::U256;
use evmlib::common::TxHash;
use evmlib::{
common::{Address as RewardsAddress, QuoteHash},
utils::dummy_address,
};
use libp2p::{identity::PublicKey, PeerId};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter, Result as FmtResult};
#[cfg(not(target_arch = "wasm32"))]
pub use std::time::SystemTime;
#[cfg(target_arch = "wasm32")]
Expand Down Expand Up @@ -43,9 +45,7 @@ impl ProofOfPayment {
}

/// Quoting metrics that got used to generate a quote, or to track peer's status.
#[derive(
Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize, custom_debug::Debug,
)]
#[derive(Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct QuotingMetrics {
/// the records stored
pub close_records_stored: usize,
Expand All @@ -62,6 +62,15 @@ pub struct QuotingMetrics {
pub network_size: Option<u64>,
}

impl Debug for QuotingMetrics {
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
let density_u256 = self.network_density.map(U256::from_be_bytes);

write!(formatter, "QuotingMetrics {{ close_records_stored: {}, max_records: {}, received_payment_count: {}, live_time: {}, network_density: {density_u256:?}, network_size: {:?} }}",
self.close_records_stored, self.max_records, self.received_payment_count, self.live_time, self.network_size)
}
}

impl QuotingMetrics {
/// construct an empty QuotingMetrics
pub fn new() -> Self {
Expand Down
5 changes: 3 additions & 2 deletions ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ upnp = ["libp2p/upnp"]

[dependencies]
aes-gcm-siv = "0.11.1"
alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] }
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" }
ant-build-info = { path = "../ant-build-info", version = "0.1.19" }
ant-evm = { path = "../ant-evm", version = "0.1.4" }
Expand All @@ -39,7 +40,7 @@ hyper = { version = "0.14", features = [
], optional = true }
itertools = "~0.12.1"
lazy_static = "~1.4.0"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "master", features = [
libp2p = { version = "0.54.1", features = [
"tokio",
"dns",
"kad",
Expand Down Expand Up @@ -95,7 +96,7 @@ crate-type = ["cdylib", "rlib"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "master", features = [
libp2p = { version = "0.54.1", features = [
"tokio",
"dns",
"kad",
Expand Down
7 changes: 5 additions & 2 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use crate::{
log_markers::Marker,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
};
use alloy::primitives::U256;
use ant_evm::{AttoTokens, PaymentQuote, QuotingMetrics};
use ant_protocol::{
convert_distance_to_u256,
messages::{Cmd, Request, Response},
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
Expand Down Expand Up @@ -1138,11 +1140,12 @@ impl SwarmDriver {
}

/// Returns the nodes that within the defined distance.
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: U256) -> Vec<PeerId> {
peers
.iter()
.filter_map(|peer_id| {
let distance = address.distance(&NetworkAddress::from_peer(*peer_id));
let distance =
convert_distance_to_u256(&address.distance(&NetworkAddress::from_peer(*peer_id)));
if distance <= range {
Some(*peer_id)
} else {
Expand Down
17 changes: 10 additions & 7 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use crate::{
use crate::{
metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
};
use alloy::primitives::U256;
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::PaymentQuote;
use ant_protocol::{
convert_distance_to_u256,
messages::{ChunkProof, Nonce, Request, Response},
storage::{try_deserialize_record, RetryStrategy},
version::{
Expand All @@ -48,7 +50,7 @@ use libp2p::mdns;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad::{self, KBucketDistance as Distance, QueryId, Quorum, Record, RecordKey, K_VALUE, U256},
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
Expand Down Expand Up @@ -974,10 +976,9 @@ impl SwarmDriver {
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE);
let density_distance = Distance(estimated_distance);
let density_distance = density * U256::from(CLOSE_GROUP_SIZE);

// Use distanct to close peer to avoid the situation that
// Use distance to close peer to avoid the situation that
// the estimated density_distance is too narrow.
let closest_k_peers = self.get_closest_k_value_local_peers();
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
Expand All @@ -987,9 +988,12 @@ impl SwarmDriver {
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE + 1]));
let close_peers_u256 = convert_distance_to_u256(&close_peers_distance);

let distance = std::cmp::max(density_distance, close_peers_distance);
let distance = std::cmp::max(density_distance, close_peers_u256);

// The sampling approach has severe impact to the node side performance
// Hence suggested to be only used by client side.
// let distance = if let Some(distance) = self.network_density_samples.get_median() {
// distance
// } else {
Expand All @@ -1003,10 +1007,9 @@ impl SwarmDriver {
// // Note: self is included
// let self_addr = NetworkAddress::from_peer(self.self_peer_id);
// self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))

// };

info!("Set responsible range to {distance:?}({:?})", distance.ilog2());
info!("Set responsible range to {distance:?}({:?})", distance.log2());

// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
Expand Down
26 changes: 15 additions & 11 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use aes_gcm_siv::{
aead::{Aead, KeyInit},
Aes256GcmSiv, Key as AesKey, Nonce,
};
use alloy::primitives::U256;
use ant_evm::{AttoTokens, QuotingMetrics};
use ant_protocol::{
convert_distance_to_u256,
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -145,7 +147,7 @@ pub struct NodeRecordStore {
/// Main records store remains unchanged for compatibility
records: HashMap<Key, (NetworkAddress, RecordType)>,
/// Additional index organizing records by distance
records_by_distance: BTreeMap<Distance, Key>,
records_by_distance: BTreeMap<U256, Key>,
/// FIFO simple cache of records to reduce read times
records_cache: RecordCache,
/// Send network events to the node layer.
Expand All @@ -155,7 +157,7 @@ pub struct NodeRecordStore {
/// ilog2 distance range of responsible records
/// AKA: how many buckets of data do we consider "close"
/// None means accept all records.
responsible_distance_range: Option<Distance>,
responsible_distance_range: Option<U256>,
#[cfg(feature = "open-metrics")]
/// Used to report the number of records held by the store to the metrics server.
record_count_metric: Option<Gauge>,
Expand Down Expand Up @@ -374,9 +376,9 @@ impl NodeRecordStore {
let local_address = NetworkAddress::from_peer(local_id);

// Initialize records_by_distance
let mut records_by_distance: BTreeMap<Distance, Key> = BTreeMap::new();
let mut records_by_distance: BTreeMap<U256, Key> = BTreeMap::new();
for (key, (addr, _record_type)) in records.iter() {
let distance = local_address.distance(addr);
let distance = convert_distance_to_u256(&local_address.distance(addr));
let _ = records_by_distance.insert(distance, key.clone());
}

Expand Down Expand Up @@ -413,7 +415,7 @@ impl NodeRecordStore {
}

/// Returns the current distance ilog2 (aka bucket) range of CLOSE_GROUP nodes.
pub fn get_responsible_distance_range(&self) -> Option<Distance> {
pub fn get_responsible_distance_range(&self) -> Option<U256> {
self.responsible_distance_range
}

Expand Down Expand Up @@ -615,13 +617,14 @@ impl NodeRecordStore {
pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: RecordType) {
let addr = NetworkAddress::from_record_key(&key);
let distance = self.local_address.distance(&addr);
let distance_u256 = convert_distance_to_u256(&distance);

// Update main records store
self.records
.insert(key.clone(), (addr.clone(), record_type));

// Update bucket index
let _ = self.records_by_distance.insert(distance, key.clone());
let _ = self.records_by_distance.insert(distance_u256, key.clone());

// Update farthest record if needed (unchanged)
if let Some((_farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
Expand Down Expand Up @@ -751,7 +754,7 @@ impl NodeRecordStore {
let relevant_records = self.get_records_within_distance_range(distance_range);

// The `responsible_range` is the network density
quoting_metrics.network_density = Some(distance_range.0.into());
quoting_metrics.network_density = Some(distance_range.to_be_bytes());

quoting_metrics.close_records_stored = relevant_records;
} else {
Expand All @@ -777,7 +780,7 @@ impl NodeRecordStore {
}

/// Calculate how many records are stored within a distance range
pub fn get_records_within_distance_range(&self, range: Distance) -> usize {
pub fn get_records_within_distance_range(&self, range: U256) -> usize {
let within_range = self
.records_by_distance
.range(..range)
Expand All @@ -790,7 +793,7 @@ impl NodeRecordStore {
}

/// Setup the distance range.
pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: Distance) {
pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: U256) {
self.responsible_distance_range = Some(responsible_distance);
}
}
Expand Down Expand Up @@ -886,7 +889,7 @@ impl RecordStore for NodeRecordStore {
fn remove(&mut self, k: &Key) {
// Remove from main store
if let Some((addr, _)) = self.records.remove(k) {
let distance = self.local_address.distance(&addr);
let distance = convert_distance_to_u256(&self.local_address.distance(&addr));
let _ = self.records_by_distance.remove(&distance);
}

Expand Down Expand Up @@ -1035,6 +1038,7 @@ mod tests {

use ant_evm::utils::dummy_address;
use ant_evm::{PaymentQuote, RewardsAddress};
use ant_protocol::convert_distance_to_u256;
use ant_protocol::storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, Scratchpad,
};
Expand Down Expand Up @@ -1677,7 +1681,7 @@ mod tests {
.wrap_err("Could not parse record store key")?,
);
// get the distance to this record from our local key
let distance = self_address.distance(&halfway_record_address);
let distance = convert_distance_to_u256(&self_address.distance(&halfway_record_address));

// must be plus one bucket from the halfway record
store.set_responsible_distance_range(distance);
Expand Down
7 changes: 4 additions & 3 deletions ant-networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
#![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress

use crate::record_store::{ClientRecordStore, NodeRecordStore};
use alloy::primitives::U256;
use ant_evm::{AttoTokens, QuotingMetrics};
use ant_protocol::{storage::RecordType, NetworkAddress};
use libp2p::kad::{
store::{RecordStore, Result},
KBucketDistance as Distance, ProviderRecord, Record, RecordKey,
ProviderRecord, Record, RecordKey,
};
use std::{borrow::Cow, collections::HashMap};

Expand Down Expand Up @@ -134,7 +135,7 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn get_farthest_replication_distance(&self) -> Option<Distance> {
pub(crate) fn get_farthest_replication_distance(&self) -> Option<U256> {
match self {
Self::Client(_store) => {
warn!("Calling get_distance_range at Client. This should not happen");
Expand All @@ -144,7 +145,7 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn set_distance_range(&mut self, distance: Distance) {
pub(crate) fn set_distance_range(&mut self, distance: U256) {
match self {
Self::Client(_store) => {
warn!("Calling set_distance_range at Client. This should not happen");
Expand Down
17 changes: 11 additions & 6 deletions ant-networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

use crate::target_arch::spawn;
use crate::{event::NetworkEvent, target_arch::Instant};
use ant_protocol::{storage::RecordType, NetworkAddress, PrettyPrintRecordKey};
use alloy::primitives::U256;
use ant_protocol::{
convert_distance_to_u256, storage::RecordType, NetworkAddress, PrettyPrintRecordKey,
};
use libp2p::{
kad::{KBucketDistance as Distance, RecordKey, K_VALUE},
PeerId,
Expand Down Expand Up @@ -42,7 +45,7 @@ pub(crate) struct ReplicationFetcher {
on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>,
event_sender: mpsc::Sender<NetworkEvent>,
/// Distance range that the incoming key shall be fetched
distance_range: Option<Distance>,
distance_range: Option<U256>,
/// Restrict fetch range to closer than this value
/// used when the node is full, but we still have "close" data coming in
/// that is _not_ closer than our farthest max record
Expand All @@ -63,7 +66,7 @@ impl ReplicationFetcher {
}

/// Set the distance range.
pub(crate) fn set_replication_distance_range(&mut self, distance_range: Distance) {
pub(crate) fn set_replication_distance_range(&mut self, distance_range: U256) {
self.distance_range = Some(distance_range);
}

Expand Down Expand Up @@ -136,7 +139,8 @@ impl ReplicationFetcher {
// Filter out those out_of_range ones among the incoming_keys.
if let Some(ref distance_range) = self.distance_range {
new_incoming_keys.retain(|(addr, _record_type)| {
let is_in_range = self_address.distance(addr) <= *distance_range;
let is_in_range =
convert_distance_to_u256(&self_address.distance(addr)) <= *distance_range;
if !is_in_range {
out_of_range_keys.push(addr.clone());
}
Expand Down Expand Up @@ -408,7 +412,7 @@ impl ReplicationFetcher {
#[cfg(test)]
mod tests {
use super::{ReplicationFetcher, FETCH_TIMEOUT, MAX_PARALLEL_FETCH};
use ant_protocol::{storage::RecordType, NetworkAddress};
use ant_protocol::{convert_distance_to_u256, storage::RecordType, NetworkAddress};
use eyre::Result;
use libp2p::{kad::RecordKey, PeerId};
use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -479,7 +483,8 @@ mod tests {
// Set distance range
let distance_target = NetworkAddress::from_peer(PeerId::random());
let distance_range = self_address.distance(&distance_target);
replication_fetcher.set_replication_distance_range(distance_range);
let distance_256 = convert_distance_to_u256(&distance_range);
replication_fetcher.set_replication_distance_range(distance_256);

let mut incoming_keys = Vec::new();
let mut in_range_keys = 0;
Expand Down
2 changes: 1 addition & 1 deletion ant-node-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ colored = "2.0.4"
color-eyre = "~0.6"
dirs-next = "2.0.0"
indicatif = { version = "0.17.5", features = ["tokio"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "master", features = [] }
libp2p = { version = "0.54.1", features = [] }
libp2p-identity = { version = "0.2.7", features = ["rand"] }
prost = { version = "0.9" }
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion ant-node-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ bls = { package = "blsttc", version = "8.0.1" }
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = "0.6.2"
hex = "~0.4.3"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "master", features = ["kad"]}
libp2p = { version = "0.54.1", features = ["kad"]}
libp2p-identity = { version="0.2.7", features = ["rand"] }
thiserror = "1.0.23"
# # watch out updating this, protoc compiler needs to be installed on all build systems
Expand Down
Loading

0 comments on commit 8e4335e

Please sign in to comment.