Skip to content

Commit

Permalink
Merge branch 'main' into local_as_runtime_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach authored Jan 7, 2025
2 parents e7ea9bc + ac85238 commit fb410ce
Show file tree
Hide file tree
Showing 20 changed files with 190 additions and 116 deletions.
12 changes: 6 additions & 6 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256};
use ant_protocol::{
convert_distance_to_u256,
messages::{Cmd, Request, Response},
storage::{RecordHeader, RecordKind, RecordType},
storage::{RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
use libp2p::{
Expand Down Expand Up @@ -92,7 +92,7 @@ pub enum LocalSwarmCmd {
},
/// Get the Addresses of all the Records held locally
GetAllLocalRecordAddresses {
sender: oneshot::Sender<HashMap<NetworkAddress, RecordType>>,
sender: oneshot::Sender<HashMap<NetworkAddress, ValidationType>>,
},
/// Get data from the local RecordStore
GetLocalRecord {
Expand Down Expand Up @@ -120,7 +120,7 @@ pub enum LocalSwarmCmd {
/// This should be done after the record has been stored to disk
AddLocalRecordAsStored {
key: RecordKey,
record_type: RecordType,
record_type: ValidationType,
},
/// Add a peer to the blocklist
AddPeerToBlockList {
Expand All @@ -141,7 +141,7 @@ pub enum LocalSwarmCmd {
quotes: Vec<(PeerId, PaymentQuote)>,
},
// Notify a fetch completion
FetchCompleted((RecordKey, RecordType)),
FetchCompleted((RecordKey, ValidationType)),
/// Triggers interval repliation
/// NOTE: This does result in outgoing messages, but is produced locally
TriggerIntervalReplication,
Expand Down Expand Up @@ -661,13 +661,13 @@ impl SwarmDriver {
let record_type = match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::Chunk => RecordType::Chunk,
RecordKind::Chunk => ValidationType::Chunk,
RecordKind::GraphEntry
| RecordKind::Pointer
| RecordKind::Register
| RecordKind::Scratchpad => {
let content_hash = XorName::from_content(&record.value);
RecordType::NonChunk(content_hash)
ValidationType::NonChunk(content_hash)
}
RecordKind::ChunkWithPayment
| RecordKind::RegisterWithPayment
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use ant_protocol::{
messages::{CmdResponse, Request, Response},
storage::RecordType,
storage::ValidationType,
NetworkAddress,
};
use libp2p::request_response::{self, Message};
Expand Down Expand Up @@ -159,7 +159,7 @@ impl SwarmDriver {
fn add_keys_to_replication_fetcher(
&mut self,
sender: NetworkAddress,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
incoming_keys: Vec<(NetworkAddress, ValidationType)>,
) {
let holder = if let Some(peer_id) = sender.as_peer_id() {
peer_id
Expand Down
6 changes: 3 additions & 3 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics};
use ant_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
storage::{Pointer, RecordType, RetryStrategy, Scratchpad},
storage::{Pointer, RetryStrategy, Scratchpad, ValidationType},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use futures::future::select_all;
Expand Down Expand Up @@ -964,7 +964,7 @@ impl Network {

/// Notify ReplicationFetch a fetch attempt is completed.
/// (but it won't trigger any real writes to disk, say fetched an old version of register)
pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) {
pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
}

Expand Down Expand Up @@ -995,7 +995,7 @@ impl Network {
/// Returns the Addresses of all the locally stored Records
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
) -> Result<HashMap<NetworkAddress, ValidationType>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });

Expand Down
56 changes: 28 additions & 28 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aes_gcm_siv::{
use ant_evm::{QuotingMetrics, U256};
use ant_protocol::{
convert_distance_to_u256,
storage::{RecordHeader, RecordKind, RecordType},
storage::{RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
use hkdf::Hkdf;
Expand Down Expand Up @@ -138,7 +138,7 @@ pub struct NodeRecordStore {
/// The configuration of the store.
config: NodeRecordStoreConfig,
/// Main records store remains unchanged for compatibility
records: HashMap<Key, (NetworkAddress, RecordType)>,
records: HashMap<Key, (NetworkAddress, ValidationType)>,
/// Additional index organizing records by distance
records_by_distance: BTreeMap<U256, Key>,
/// FIFO simple cache of records to reduce read times
Expand Down Expand Up @@ -218,7 +218,7 @@ impl NodeRecordStore {
fn update_records_from_an_existing_store(
config: &NodeRecordStoreConfig,
encryption_details: &(Aes256GcmSiv, [u8; 4]),
) -> HashMap<Key, (NetworkAddress, RecordType)> {
) -> HashMap<Key, (NetworkAddress, ValidationType)> {
let process_entry = |entry: &DirEntry| -> _ {
let path = entry.path();
if path.is_file() {
Expand Down Expand Up @@ -270,10 +270,10 @@ impl NodeRecordStore {
};

let record_type = match RecordHeader::is_record_of_type_chunk(&record) {
Ok(true) => RecordType::Chunk,
Ok(true) => ValidationType::Chunk,
Ok(false) => {
let xorname_hash = XorName::from_content(&record.value);
RecordType::NonChunk(xorname_hash)
ValidationType::NonChunk(xorname_hash)
}
Err(error) => {
warn!(
Expand Down Expand Up @@ -585,22 +585,22 @@ impl NodeRecordStore {

/// Returns the set of `NetworkAddress::RecordKey` held by the store
/// Use `record_addresses_ref` to get a borrowed type
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
self.records
.iter()
.map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone()))
.collect()
}

/// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, ValidationType)> {
&self.records
}

/// The follow up to `put_verified`, this only registers the RecordKey
/// in the RecordStore records set. After this it should be safe
/// to return the record as stored.
pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: RecordType) {
pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: ValidationType) {
let addr = NetworkAddress::from_record_key(&key);
let distance = self.local_address.distance(&addr);
let distance_u256 = convert_distance_to_u256(&distance);
Expand Down Expand Up @@ -648,7 +648,7 @@ impl NodeRecordStore {
///
/// The record is marked as written to disk once `mark_as_stored` is called,
/// this avoids us returning half-written data or registering it as stored before it is.
pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> {
pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> {
let key = &r.key;
let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
debug!("PUTting a verified Record: {record_key:?}");
Expand Down Expand Up @@ -838,11 +838,11 @@ impl RecordStore for NodeRecordStore {
// otherwise shall be passed further to allow different version of nonchunk
// to be detected or updated.
match self.records.get(&record.key) {
Some((_addr, RecordType::Chunk)) => {
Some((_addr, ValidationType::Chunk)) => {
debug!("Chunk {record_key:?} already exists.");
return Ok(());
}
Some((_addr, RecordType::NonChunk(existing_content_hash))) => {
Some((_addr, ValidationType::NonChunk(existing_content_hash))) => {
let content_hash = XorName::from_content(&record.value);
if content_hash == *existing_content_hash {
debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
Expand Down Expand Up @@ -938,27 +938,27 @@ impl RecordStore for NodeRecordStore {
/// A place holder RecordStore impl for the client that does nothing
#[derive(Default, Debug)]
pub struct ClientRecordStore {
empty_record_addresses: HashMap<Key, (NetworkAddress, RecordType)>,
empty_record_addresses: HashMap<Key, (NetworkAddress, ValidationType)>,
}

impl ClientRecordStore {
pub(crate) fn contains(&self, _key: &Key) -> bool {
false
}

pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
HashMap::new()
}

pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, ValidationType)> {
&self.empty_record_addresses
}

pub(crate) fn put_verified(&mut self, _r: Record, _record_type: RecordType) -> Result<()> {
pub(crate) fn put_verified(&mut self, _r: Record, _record_type: ValidationType) -> Result<()> {
Ok(())
}

pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: RecordType) {}
pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: ValidationType) {}
}

impl RecordStore for ClientRecordStore {
Expand Down Expand Up @@ -1093,12 +1093,12 @@ mod tests {
let returned_record_key = returned_record.key.clone();

assert!(store
.put_verified(returned_record, RecordType::Chunk)
.put_verified(returned_record, ValidationType::Chunk)
.is_ok());

// We must also mark the record as stored (which would be triggered after the async write in nodes
// via NetworkEvent::CompletedWrite)
store.mark_as_stored(returned_record_key, RecordType::Chunk);
store.mark_as_stored(returned_record_key, ValidationType::Chunk);

// loop over store.get max_iterations times to ensure async disk write had time to complete.
let max_iterations = 10;
Expand Down Expand Up @@ -1169,7 +1169,7 @@ mod tests {

// Store the chunk using put_verified
assert!(store
.put_verified(record.clone(), RecordType::Chunk)
.put_verified(record.clone(), ValidationType::Chunk)
.is_ok());

// Wait for the async write operation to complete
Expand Down Expand Up @@ -1270,11 +1270,11 @@ mod tests {

// Store the chunk using put_verified
assert!(store
.put_verified(record.clone(), RecordType::Chunk)
.put_verified(record.clone(), ValidationType::Chunk)
.is_ok());

// Mark as stored (simulating the CompletedWrite event)
store.mark_as_stored(record.key.clone(), RecordType::Chunk);
store.mark_as_stored(record.key.clone(), ValidationType::Chunk);

// Verify the chunk is stored
let stored_record = store.get(&record.key);
Expand Down Expand Up @@ -1343,14 +1343,14 @@ mod tests {
assert!(store
.put_verified(
record.clone(),
RecordType::NonChunk(XorName::from_content(&record.value))
ValidationType::NonChunk(XorName::from_content(&record.value))
)
.is_ok());

// Mark as stored (simulating the CompletedWrite event)
store.mark_as_stored(
record.key.clone(),
RecordType::NonChunk(XorName::from_content(&record.value)),
ValidationType::NonChunk(XorName::from_content(&record.value)),
);

// Verify the scratchpad is stored
Expand Down Expand Up @@ -1437,15 +1437,15 @@ mod tests {
};

// Will be stored anyway.
let succeeded = store.put_verified(record, RecordType::Chunk).is_ok();
let succeeded = store.put_verified(record, ValidationType::Chunk).is_ok();

if !succeeded {
failed_records.push(record_key.clone());
println!("failed {:?}", PrettyPrintRecordKey::from(&record_key));
} else {
// We must also mark the record as stored (which would be triggered
// after the async write in nodes via NetworkEvent::CompletedWrite)
store.mark_as_stored(record_key.clone(), RecordType::Chunk);
store.mark_as_stored(record_key.clone(), ValidationType::Chunk);

println!("success sotred len: {:?} ", store.record_addresses().len());
stored_records_at_some_point.push(record_key.clone());
Expand Down Expand Up @@ -1499,7 +1499,7 @@ mod tests {
// now for any stored data. It either shoudl still be stored OR further away than `most_distant_data`
for data in stored_records_at_some_point {
let data_addr = NetworkAddress::from_record_key(&data);
if !sorted_stored_data.contains(&(&data_addr, &RecordType::Chunk)) {
if !sorted_stored_data.contains(&(&data_addr, &ValidationType::Chunk)) {
assert!(
self_address.distance(&data_addr)
> self_address.distance(most_distant_data),
Expand Down Expand Up @@ -1558,10 +1558,10 @@ mod tests {
publisher: None,
expires: None,
};
assert!(store.put_verified(record, RecordType::Chunk).is_ok());
assert!(store.put_verified(record, ValidationType::Chunk).is_ok());
// We must also mark the record as stored (which would be triggered after the async write in nodes
// via NetworkEvent::CompletedWrite)
store.mark_as_stored(record_key.clone(), RecordType::Chunk);
store.mark_as_stored(record_key.clone(), ValidationType::Chunk);

stored_records.push(record_key.clone());
stored_records.sort_by(|a, b| {
Expand Down
12 changes: 7 additions & 5 deletions ant-networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use crate::record_store::{ClientRecordStore, NodeRecordStore};
use ant_evm::{QuotingMetrics, U256};
use ant_protocol::{storage::RecordType, NetworkAddress};
use ant_protocol::{storage::ValidationType, NetworkAddress};
use libp2p::kad::{
store::{RecordStore, Result},
ProviderRecord, Record, RecordKey,
Expand Down Expand Up @@ -90,21 +90,23 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
match self {
Self::Client(store) => store.record_addresses(),
Self::Node(store) => store.record_addresses(),
}
}

pub(crate) fn record_addresses_ref(&self) -> &HashMap<RecordKey, (NetworkAddress, RecordType)> {
pub(crate) fn record_addresses_ref(
&self,
) -> &HashMap<RecordKey, (NetworkAddress, ValidationType)> {
match self {
Self::Client(store) => store.record_addresses_ref(),
Self::Node(store) => store.record_addresses_ref(),
}
}

pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> {
pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> {
match self {
Self::Client(store) => store.put_verified(r, record_type),
Self::Node(store) => store.put_verified(r, record_type),
Expand Down Expand Up @@ -168,7 +170,7 @@ impl UnifiedRecordStore {
/// Mark the record as stored in the store.
/// This adds it to records set, so it can now be retrieved
/// (to be done after writes are finalised)
pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: RecordType) {
pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) {
match self {
Self::Client(store) => store.mark_as_stored(k, record_type),
Self::Node(store) => store.mark_as_stored(k, record_type),
Expand Down
Loading

0 comments on commit fb410ce

Please sign in to comment.