From c565e32d4da641f49f845d1f20234055a5dc256b Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Mon, 6 Jan 2025 12:34:02 +0100 Subject: [PATCH 01/13] refactor(autonomi): remove uploaded metadata The uploaded timestamp caused the archive to change between uploads and thus requiring the archive to be re-uploaded. --- autonomi/src/client/files/archive.rs | 1 - autonomi/src/client/files/fs_public.rs | 5 ----- 2 files changed, 6 deletions(-) diff --git a/autonomi/src/client/files/archive.rs b/autonomi/src/client/files/archive.rs index 03a82d423a..5ec00bfa13 100644 --- a/autonomi/src/client/files/archive.rs +++ b/autonomi/src/client/files/archive.rs @@ -55,7 +55,6 @@ impl Metadata { .as_secs(); Self { - uploaded: now, created: now, modified: now, size, diff --git a/autonomi/src/client/files/fs_public.rs b/autonomi/src/client/files/fs_public.rs index 60f13d0cb1..92e5e5455b 100644 --- a/autonomi/src/client/files/fs_public.rs +++ b/autonomi/src/client/files/fs_public.rs @@ -194,7 +194,6 @@ pub(crate) fn metadata_from_entry(entry: &walkdir::DirEntry) -> Metadata { entry.path().display() ); return Metadata { - uploaded: 0, created: 0, modified: 0, size: 0, @@ -224,10 +223,6 @@ pub(crate) fn metadata_from_entry(entry: &walkdir::DirEntry) -> Metadata { let modified = unix_time("modified", fs_metadata.modified()); Metadata { - uploaded: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or(Duration::from_secs(0)) - .as_secs(), created, modified, size: fs_metadata.len(), From f13415e4b92f70f8cf7e7c7a5d360244d658032d Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Mon, 6 Jan 2025 12:34:39 +0100 Subject: [PATCH 02/13] refactor(autonomi): use deterministic serialization Use BTreeMap instead of HashMap so serde serializes deterministically. --- autonomi/src/client/files/archive.rs | 10 ++++------ autonomi/src/client/files/archive_public.rs | 8 ++++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/autonomi/src/client/files/archive.rs b/autonomi/src/client/files/archive.rs index 5ec00bfa13..18ecd1c735 100644 --- a/autonomi/src/client/files/archive.rs +++ b/autonomi/src/client/files/archive.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use std::{ - collections::HashMap, + collections::BTreeMap, path::{Path, PathBuf}, }; @@ -36,8 +36,6 @@ pub enum RenameError { /// Metadata for a file in an archive. Time values are UNIX timestamps. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Metadata { - /// When the file was (last) uploaded to the network. - pub uploaded: u64, /// File creation time on local file system. See [`std::fs::Metadata::created`] for details per OS. pub created: u64, /// Last file modification time taken from local file system. See [`std::fs::Metadata::modified`] for details per OS. @@ -67,7 +65,7 @@ impl Metadata { /// The data maps are stored within this structure instead of uploading them to the network, keeping the data private. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct PrivateArchive { - map: HashMap, + map: BTreeMap, } impl PrivateArchive { @@ -75,7 +73,7 @@ impl PrivateArchive { /// Note that this does not upload the archive to the network pub fn new() -> Self { Self { - map: HashMap::new(), + map: BTreeMap::new(), } } @@ -129,7 +127,7 @@ impl PrivateArchive { } /// Get the underlying map - pub fn map(&self) -> &HashMap { + pub fn map(&self) -> &BTreeMap { &self.map } diff --git a/autonomi/src/client/files/archive_public.rs b/autonomi/src/client/files/archive_public.rs index 19f1756b8b..dd7cb046e2 100644 --- a/autonomi/src/client/files/archive_public.rs +++ b/autonomi/src/client/files/archive_public.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use std::{ - collections::HashMap, + collections::BTreeMap, path::{Path, PathBuf}, }; @@ -34,7 +34,7 @@ pub type ArchiveAddr = XorName; /// to the network, of which the addresses are stored in this archive. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct PublicArchive { - map: HashMap, + map: BTreeMap, } impl PublicArchive { @@ -42,7 +42,7 @@ impl PublicArchive { /// Note that this does not upload the archive to the network pub fn new() -> Self { Self { - map: HashMap::new(), + map: BTreeMap::new(), } } @@ -92,7 +92,7 @@ impl PublicArchive { } /// Get the underlying map - pub fn map(&self) -> &HashMap { + pub fn map(&self) -> &BTreeMap { &self.map } From 322f4dea2ccacff04e5ad656c70504d44e5f1719 Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Tue, 7 Jan 2025 11:20:04 +0100 Subject: [PATCH 03/13] ci: check that second upload costs 0 tokens --- .github/workflows/memcheck.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 4853442936..712971a324 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -100,6 +100,7 @@ jobs: ls -l $ANT_DATA_PATH cp ./the-test-data.zip ./the-test-data_1.zip ./target/release/ant --log-output-dest=data-dir file upload "./the-test-data_1.zip" > ./second_upload 2>&1 + rg 'Total cost: 0 AttoTokens' ./second_upload -c --stats env: ANT_LOG: "all" timeout-minutes: 25 From 5b62f1c169c7908359bc1296c04dec0f0be2d03d Mon Sep 17 00:00:00 2001 From: Benno Zeeman Date: Tue, 7 Jan 2025 14:10:09 +0100 Subject: [PATCH 04/13] ci: memcheck second upload same way as first --- .github/workflows/memcheck.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 712971a324..4fae7db742 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -98,15 +98,14 @@ jobs: ls -l $ANT_DATA_PATH/client_first/logs mkdir $ANT_DATA_PATH/client ls -l $ANT_DATA_PATH - cp ./the-test-data.zip ./the-test-data_1.zip - ./target/release/ant --log-output-dest=data-dir file upload "./the-test-data_1.zip" > ./second_upload 2>&1 - rg 'Total cost: 0 AttoTokens' ./second_upload -c --stats + ./target/release/ant --log-output-dest=data-dir file upload --public "./the-test-data.zip" > ./upload_output_second 2>&1 + rg 'Total cost: 0 AttoTokens' ./upload_output_second -c --stats env: ANT_LOG: "all" timeout-minutes: 25 - name: showing the second upload terminal output - run: cat second_upload + run: cat upload_output_second shell: bash if: always() From abada20949bc25e0e9c63258f89ab02a9e817241 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 7 Jan 2025 21:11:49 +0800 Subject: [PATCH 05/13] chore: rename RecordType to ValidationType --- ant-networking/src/cmd.rs | 12 ++-- ant-networking/src/event/request_response.rs | 4 +- ant-networking/src/lib.rs | 6 +- ant-networking/src/record_store.rs | 56 +++++++++---------- ant-networking/src/record_store_api.rs | 12 ++-- ant-networking/src/replication_fetcher.rs | 29 +++++----- ant-node/src/node.rs | 6 +- ant-node/src/put_validation.rs | 39 +++++++------ ant-node/src/replication.rs | 4 +- ant-protocol/src/messages/cmd.rs | 4 +- ant-protocol/src/storage/header.rs | 2 +- ant-protocol/src/storage/mod.rs | 4 +- .../api/ant-node/README.md | 6 +- .../api/ant-node/network.md | 4 +- 14 files changed, 100 insertions(+), 88 deletions(-) diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index 17e4e3cfc9..c5191cda41 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, messages::{Cmd, Request, Response}, - storage::{RecordHeader, RecordKind, RecordType}, + storage::{RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ @@ -92,7 +92,7 @@ pub enum LocalSwarmCmd { }, /// Get the Addresses of all the Records held locally GetAllLocalRecordAddresses { - sender: oneshot::Sender>, + sender: oneshot::Sender>, }, /// Get data from the local RecordStore GetLocalRecord { @@ -120,7 +120,7 @@ pub enum LocalSwarmCmd { /// This should be done after the record has been stored to disk AddLocalRecordAsStored { key: RecordKey, - record_type: RecordType, + record_type: ValidationType, }, /// Add a peer to the blocklist AddPeerToBlockList { @@ -141,7 +141,7 @@ pub enum LocalSwarmCmd { quotes: Vec<(PeerId, PaymentQuote)>, }, // Notify a fetch completion - FetchCompleted((RecordKey, RecordType)), + FetchCompleted((RecordKey, ValidationType)), /// Triggers interval repliation /// NOTE: This does result in outgoing messages, but is produced locally TriggerIntervalReplication, @@ -661,13 +661,13 @@ impl SwarmDriver { let record_type = match RecordHeader::from_record(&record) { Ok(record_header) => { match record_header.kind { - RecordKind::Chunk => RecordType::Chunk, + RecordKind::Chunk => ValidationType::Chunk, RecordKind::GraphEntry | RecordKind::Pointer | RecordKind::Register | RecordKind::Scratchpad => { let content_hash = XorName::from_content(&record.value); - RecordType::NonChunk(content_hash) + ValidationType::NonChunk(content_hash) } RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment diff --git a/ant-networking/src/event/request_response.rs b/ant-networking/src/event/request_response.rs index ce6755e8dc..4d8de2131f 100644 --- a/ant-networking/src/event/request_response.rs +++ b/ant-networking/src/event/request_response.rs @@ -12,7 +12,7 @@ use crate::{ }; use ant_protocol::{ messages::{CmdResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, }; use libp2p::request_response::{self, Message}; @@ -159,7 +159,7 @@ impl SwarmDriver { fn add_keys_to_replication_fetcher( &mut self, sender: NetworkAddress, - incoming_keys: Vec<(NetworkAddress, RecordType)>, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, ) { let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index dda9e1d8d3..cb4f761655 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics}; use ant_protocol::{ error::Error as ProtocolError, messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response}, - storage::{Pointer, RecordType, RetryStrategy, Scratchpad}, + storage::{Pointer, RetryStrategy, Scratchpad, ValidationType}, NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use futures::future::select_all; @@ -964,7 +964,7 @@ impl Network { /// Notify ReplicationFetch a fetch attempt is completed. /// (but it won't trigger any real writes to disk, say fetched an old version of register) - pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) { + pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) { self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type))) } @@ -995,7 +995,7 @@ impl Network { /// Returns the Addresses of all the locally stored Records pub async fn get_all_local_record_addresses( &self, - ) -> Result> { + ) -> Result> { let (sender, receiver) = oneshot::channel(); self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender }); diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index cabdb6611c..e9e1d2886c 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -19,7 +19,7 @@ use aes_gcm_siv::{ use ant_evm::{QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, - storage::{RecordHeader, RecordKind, RecordType}, + storage::{RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use hkdf::Hkdf; @@ -138,7 +138,7 @@ pub struct NodeRecordStore { /// The configuration of the store. config: NodeRecordStoreConfig, /// Main records store remains unchanged for compatibility - records: HashMap, + records: HashMap, /// Additional index organizing records by distance records_by_distance: BTreeMap, /// FIFO simple cache of records to reduce read times @@ -218,7 +218,7 @@ impl NodeRecordStore { fn update_records_from_an_existing_store( config: &NodeRecordStoreConfig, encryption_details: &(Aes256GcmSiv, [u8; 4]), - ) -> HashMap { + ) -> HashMap { let process_entry = |entry: &DirEntry| -> _ { let path = entry.path(); if path.is_file() { @@ -270,10 +270,10 @@ impl NodeRecordStore { }; let record_type = match RecordHeader::is_record_of_type_chunk(&record) { - Ok(true) => RecordType::Chunk, + Ok(true) => ValidationType::Chunk, Ok(false) => { let xorname_hash = XorName::from_content(&record.value); - RecordType::NonChunk(xorname_hash) + ValidationType::NonChunk(xorname_hash) } Err(error) => { warn!( @@ -585,7 +585,7 @@ impl NodeRecordStore { /// Returns the set of `NetworkAddress::RecordKey` held by the store /// Use `record_addresses_ref` to get a borrowed type - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { self.records .iter() .map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone())) @@ -593,14 +593,14 @@ impl NodeRecordStore { } /// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref(&self) -> &HashMap { &self.records } /// The follow up to `put_verified`, this only registers the RecordKey /// in the RecordStore records set. After this it should be safe /// to return the record as stored. - pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: RecordType) { + pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: ValidationType) { let addr = NetworkAddress::from_record_key(&key); let distance = self.local_address.distance(&addr); let distance_u256 = convert_distance_to_u256(&distance); @@ -648,7 +648,7 @@ impl NodeRecordStore { /// /// The record is marked as written to disk once `mark_as_stored` is called, /// this avoids us returning half-written data or registering it as stored before it is. - pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> { let key = &r.key; let record_key = PrettyPrintRecordKey::from(&r.key).into_owned(); debug!("PUTting a verified Record: {record_key:?}"); @@ -838,11 +838,11 @@ impl RecordStore for NodeRecordStore { // otherwise shall be passed further to allow different version of nonchunk // to be detected or updated. match self.records.get(&record.key) { - Some((_addr, RecordType::Chunk)) => { + Some((_addr, ValidationType::Chunk)) => { debug!("Chunk {record_key:?} already exists."); return Ok(()); } - Some((_addr, RecordType::NonChunk(existing_content_hash))) => { + Some((_addr, ValidationType::NonChunk(existing_content_hash))) => { let content_hash = XorName::from_content(&record.value); if content_hash == *existing_content_hash { debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists."); @@ -938,7 +938,7 @@ impl RecordStore for NodeRecordStore { /// A place holder RecordStore impl for the client that does nothing #[derive(Default, Debug)] pub struct ClientRecordStore { - empty_record_addresses: HashMap, + empty_record_addresses: HashMap, } impl ClientRecordStore { @@ -946,19 +946,19 @@ impl ClientRecordStore { false } - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { HashMap::new() } - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref(&self) -> &HashMap { &self.empty_record_addresses } - pub(crate) fn put_verified(&mut self, _r: Record, _record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, _r: Record, _record_type: ValidationType) -> Result<()> { Ok(()) } - pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: RecordType) {} + pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: ValidationType) {} } impl RecordStore for ClientRecordStore { @@ -1093,12 +1093,12 @@ mod tests { let returned_record_key = returned_record.key.clone(); assert!(store - .put_verified(returned_record, RecordType::Chunk) + .put_verified(returned_record, ValidationType::Chunk) .is_ok()); // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(returned_record_key, RecordType::Chunk); + store.mark_as_stored(returned_record_key, ValidationType::Chunk); // loop over store.get max_iterations times to ensure async disk write had time to complete. let max_iterations = 10; @@ -1169,7 +1169,7 @@ mod tests { // Store the chunk using put_verified assert!(store - .put_verified(record.clone(), RecordType::Chunk) + .put_verified(record.clone(), ValidationType::Chunk) .is_ok()); // Wait for the async write operation to complete @@ -1270,11 +1270,11 @@ mod tests { // Store the chunk using put_verified assert!(store - .put_verified(record.clone(), RecordType::Chunk) + .put_verified(record.clone(), ValidationType::Chunk) .is_ok()); // Mark as stored (simulating the CompletedWrite event) - store.mark_as_stored(record.key.clone(), RecordType::Chunk); + store.mark_as_stored(record.key.clone(), ValidationType::Chunk); // Verify the chunk is stored let stored_record = store.get(&record.key); @@ -1343,14 +1343,14 @@ mod tests { assert!(store .put_verified( record.clone(), - RecordType::NonChunk(XorName::from_content(&record.value)) + ValidationType::NonChunk(XorName::from_content(&record.value)) ) .is_ok()); // Mark as stored (simulating the CompletedWrite event) store.mark_as_stored( record.key.clone(), - RecordType::NonChunk(XorName::from_content(&record.value)), + ValidationType::NonChunk(XorName::from_content(&record.value)), ); // Verify the scratchpad is stored @@ -1437,7 +1437,7 @@ mod tests { }; // Will be stored anyway. - let succeeded = store.put_verified(record, RecordType::Chunk).is_ok(); + let succeeded = store.put_verified(record, ValidationType::Chunk).is_ok(); if !succeeded { failed_records.push(record_key.clone()); @@ -1445,7 +1445,7 @@ mod tests { } else { // We must also mark the record as stored (which would be triggered // after the async write in nodes via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), RecordType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk); println!("success sotred len: {:?} ", store.record_addresses().len()); stored_records_at_some_point.push(record_key.clone()); @@ -1499,7 +1499,7 @@ mod tests { // now for any stored data. It either shoudl still be stored OR further away than `most_distant_data` for data in stored_records_at_some_point { let data_addr = NetworkAddress::from_record_key(&data); - if !sorted_stored_data.contains(&(&data_addr, &RecordType::Chunk)) { + if !sorted_stored_data.contains(&(&data_addr, &ValidationType::Chunk)) { assert!( self_address.distance(&data_addr) > self_address.distance(most_distant_data), @@ -1558,10 +1558,10 @@ mod tests { publisher: None, expires: None, }; - assert!(store.put_verified(record, RecordType::Chunk).is_ok()); + assert!(store.put_verified(record, ValidationType::Chunk).is_ok()); // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), RecordType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk); stored_records.push(record_key.clone()); stored_records.sort_by(|a, b| { diff --git a/ant-networking/src/record_store_api.rs b/ant-networking/src/record_store_api.rs index 0955d5499f..7db4f38e54 100644 --- a/ant-networking/src/record_store_api.rs +++ b/ant-networking/src/record_store_api.rs @@ -9,7 +9,7 @@ use crate::record_store::{ClientRecordStore, NodeRecordStore}; use ant_evm::{QuotingMetrics, U256}; -use ant_protocol::{storage::RecordType, NetworkAddress}; +use ant_protocol::{storage::ValidationType, NetworkAddress}; use libp2p::kad::{ store::{RecordStore, Result}, ProviderRecord, Record, RecordKey, @@ -90,21 +90,23 @@ impl UnifiedRecordStore { } } - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { match self { Self::Client(store) => store.record_addresses(), Self::Node(store) => store.record_addresses(), } } - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref( + &self, + ) -> &HashMap { match self { Self::Client(store) => store.record_addresses_ref(), Self::Node(store) => store.record_addresses_ref(), } } - pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> { match self { Self::Client(store) => store.put_verified(r, record_type), Self::Node(store) => store.put_verified(r, record_type), @@ -168,7 +170,7 @@ impl UnifiedRecordStore { /// Mark the record as stored in the store. /// This adds it to records set, so it can now be retrieved /// (to be done after writes are finalised) - pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: RecordType) { + pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) { match self { Self::Client(store) => store.mark_as_stored(k, record_type), Self::Node(store) => store.mark_as_stored(k, record_type), diff --git a/ant-networking/src/replication_fetcher.rs b/ant-networking/src/replication_fetcher.rs index 360e2fbe6b..a4a16abe41 100644 --- a/ant-networking/src/replication_fetcher.rs +++ b/ant-networking/src/replication_fetcher.rs @@ -11,7 +11,7 @@ use crate::time::spawn; use crate::{event::NetworkEvent, time::Instant}; use ant_evm::U256; use ant_protocol::{ - convert_distance_to_u256, storage::RecordType, NetworkAddress, PrettyPrintRecordKey, + convert_distance_to_u256, storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, @@ -40,9 +40,9 @@ type ReplicationTimeout = Instant; pub(crate) struct ReplicationFetcher { self_peer_id: PeerId, // Pending entries that to be fetched from the target peer. - to_be_fetched: HashMap<(RecordKey, RecordType, PeerId), ReplicationTimeout>, + to_be_fetched: HashMap<(RecordKey, ValidationType, PeerId), ReplicationTimeout>, // Avoid fetching same chunk from different nodes AND carry out too many parallel tasks. - on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>, + on_going_fetches: HashMap<(RecordKey, ValidationType), (PeerId, ReplicationTimeout)>, event_sender: mpsc::Sender, /// Distance range that the incoming key shall be fetched distance_range: Option, @@ -77,8 +77,8 @@ impl ReplicationFetcher { pub(crate) fn add_keys( &mut self, holder: PeerId, - incoming_keys: Vec<(NetworkAddress, RecordType)>, - locally_stored_keys: &HashMap, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, + locally_stored_keys: &HashMap, ) -> Vec<(PeerId, RecordKey)> { // Pre-calculate self_address since it's used multiple times let self_address = NetworkAddress::from_peer(self.self_peer_id); @@ -207,7 +207,7 @@ impl ReplicationFetcher { pub(crate) fn notify_about_new_put( &mut self, new_put: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) -> Vec<(PeerId, RecordKey)> { self.to_be_fetched .retain(|(key, t, _), _| key != &new_put || t != &record_type); @@ -222,7 +222,7 @@ impl ReplicationFetcher { pub(crate) fn notify_fetch_early_completed( &mut self, key_in: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) -> Vec<(PeerId, RecordKey)> { self.to_be_fetched.retain(|(key, current_type, _), _| { if current_type == &record_type { @@ -368,7 +368,7 @@ impl ReplicationFetcher { /// This checks the hash on transactions to ensure we pull in divergent transactions. fn remove_stored_keys( &mut self, - existing_keys: &HashMap, + existing_keys: &HashMap, ) { self.to_be_fetched.retain(|(key, t, _), _| { if let Some((_addr, record_type)) = existing_keys.get(key) { @@ -412,7 +412,7 @@ impl ReplicationFetcher { #[cfg(test)] mod tests { use super::{ReplicationFetcher, FETCH_TIMEOUT, MAX_PARALLEL_FETCH}; - use ant_protocol::{convert_distance_to_u256, storage::RecordType, NetworkAddress}; + use ant_protocol::{convert_distance_to_u256, storage::ValidationType, NetworkAddress}; use eyre::Result; use libp2p::{kad::RecordKey, PeerId}; use std::{collections::HashMap, time::Duration}; @@ -430,7 +430,7 @@ mod tests { (0..MAX_PARALLEL_FETCH * 2).for_each(|_| { let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); - incoming_keys.push((key, RecordType::Chunk)); + incoming_keys.push((key, ValidationType::Chunk)); }); let keys_to_fetch = @@ -444,7 +444,10 @@ mod tests { let key_2 = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( PeerId::random(), - vec![(key_1, RecordType::Chunk), (key_2, RecordType::Chunk)], + vec![ + (key_1, ValidationType::Chunk), + (key_2, ValidationType::Chunk), + ], &locally_stored_keys, ); assert!(keys_to_fetch.is_empty()); @@ -454,7 +457,7 @@ mod tests { let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( PeerId::random(), - vec![(key, RecordType::Chunk)], + vec![(key, ValidationType::Chunk)], &locally_stored_keys, ); assert!(!keys_to_fetch.is_empty()); @@ -496,7 +499,7 @@ mod tests { in_range_keys += 1; } - incoming_keys.push((key, RecordType::Chunk)); + incoming_keys.push((key, ValidationType::Chunk)); }); let keys_to_fetch = diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 3877a31a18..97bdd937c7 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -23,7 +23,7 @@ use ant_protocol::{ convert_distance_to_u256, error::Error as ProtocolError, messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use bytes::Bytes; @@ -811,7 +811,7 @@ impl Node { all_local_records .iter() .filter_map(|(addr, record_type)| { - if *record_type == RecordType::Chunk { + if *record_type == ValidationType::Chunk { Some(addr.clone()) } else { None @@ -876,7 +876,7 @@ impl Node { all_keys .iter() .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { + if ValidationType::Chunk == *record_type { Some(addr.clone()) } else { None diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 946b9168e9..0925c3d1f6 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -16,7 +16,7 @@ use ant_protocol::storage::GraphEntry; use ant_protocol::{ storage::{ try_deserialize_record, try_serialize_record, Chunk, GraphEntryAddress, Pointer, - RecordHeader, RecordKind, RecordType, Scratchpad, + RecordHeader, RecordKind, Scratchpad, ValidationType, }, NetworkAddress, PrettyPrintRecordKey, }; @@ -49,13 +49,13 @@ impl Node { // if we're receiving this chunk PUT again, and we have been paid, // we eagerly retry replicaiton as it seems like other nodes are having trouble // did not manage to get this chunk as yet - self.replicate_valid_fresh_record(record_key, RecordType::Chunk); + self.replicate_valid_fresh_record(record_key, ValidationType::Chunk); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. self.network() - .notify_fetch_completed(record.key.clone(), RecordType::Chunk); + .notify_fetch_completed(record.key.clone(), ValidationType::Chunk); debug!( "Chunk with addr {:?} already exists: {already_exists}, payment extracted.", @@ -75,13 +75,13 @@ impl Node { if store_chunk_result.is_ok() { Marker::ValidPaidChunkPutFromClient(&PrettyPrintRecordKey::from(&record.key)) .log(); - self.replicate_valid_fresh_record(record_key, RecordType::Chunk); + self.replicate_valid_fresh_record(record_key, ValidationType::Chunk); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. self.network() - .notify_fetch_completed(record.key.clone(), RecordType::Chunk); + .notify_fetch_completed(record.key.clone(), ValidationType::Chunk); } store_chunk_result @@ -132,14 +132,16 @@ impl Node { .log(); self.replicate_valid_fresh_record( record_key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. - self.network() - .notify_fetch_completed(record_key, RecordType::NonChunk(content_hash)); + self.network().notify_fetch_completed( + record_key, + ValidationType::NonChunk(content_hash), + ); } Err(_) => {} } @@ -213,7 +215,7 @@ impl Node { .log(); self.replicate_valid_fresh_record( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. @@ -221,7 +223,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -258,7 +260,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } else { warn!("Failed to store register update at {pretty_key:?}"); @@ -307,7 +309,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -357,13 +359,13 @@ impl Node { .log(); self.replicate_valid_fresh_record( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -561,7 +563,10 @@ impl Node { if is_client_put { let content_hash = XorName::from_content(&record.value); - self.replicate_valid_fresh_record(scratchpad_key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record( + scratchpad_key, + ValidationType::NonChunk(content_hash), + ); } Ok(()) @@ -613,7 +618,7 @@ impl Node { // However, to avoid `looping of replication`, a `replicated in` register // shall not trigger any further replication out. if is_client_put { - self.replicate_valid_fresh_record(key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record(key, ValidationType::NonChunk(content_hash)); } Ok(()) @@ -880,7 +885,7 @@ impl Node { self.network().put_local_record(record); let content_hash = XorName::from_content(&pointer.network_address().to_bytes()); - self.replicate_valid_fresh_record(key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record(key, ValidationType::NonChunk(content_hash)); Ok(()) } diff --git a/ant-node/src/replication.rs b/ant-node/src/replication.rs index 130b23e1f0..b34d6c1a71 100644 --- a/ant-node/src/replication.rs +++ b/ant-node/src/replication.rs @@ -10,7 +10,7 @@ use crate::{error::Result, node::Node}; use ant_networking::{GetRecordCfg, Network}; use ant_protocol::{ messages::{Cmd, Query, QueryResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ @@ -106,7 +106,7 @@ impl Node { pub(crate) fn replicate_valid_fresh_record( &self, paid_key: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) { let network = self.network().clone(); diff --git a/ant-protocol/src/messages/cmd.rs b/ant-protocol/src/messages/cmd.rs index f0f5e089b4..83d2ed7fa0 100644 --- a/ant-protocol/src/messages/cmd.rs +++ b/ant-protocol/src/messages/cmd.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. #![allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress -use crate::{storage::RecordType, NetworkAddress}; +use crate::{storage::ValidationType, NetworkAddress}; use serde::{Deserialize, Serialize}; /// Ant protocol cmds @@ -25,7 +25,7 @@ pub enum Cmd { /// Holder of the replication keys. holder: NetworkAddress, /// Keys of copy that shall be replicated. - keys: Vec<(NetworkAddress, RecordType)>, + keys: Vec<(NetworkAddress, ValidationType)>, }, /// Notify the peer it is now being considered as BAD due to the included behaviour PeerConsideredAsBad { diff --git a/ant-protocol/src/storage/header.rs b/ant-protocol/src/storage/header.rs index 00a4c13003..a67274d5be 100644 --- a/ant-protocol/src/storage/header.rs +++ b/ant-protocol/src/storage/header.rs @@ -19,7 +19,7 @@ use xor_name::XorName; /// This is to be only used within the node instance to reflect different content version. /// Hence, only need to have two entries: Chunk and NonChunk. #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] -pub enum RecordType { +pub enum ValidationType { Chunk, NonChunk(XorName), } diff --git a/ant-protocol/src/storage/mod.rs b/ant-protocol/src/storage/mod.rs index 706beaede0..033aaab757 100644 --- a/ant-protocol/src/storage/mod.rs +++ b/ant-protocol/src/storage/mod.rs @@ -22,7 +22,9 @@ pub use self::{ address::{ChunkAddress, GraphEntryAddress, PointerAddress, ScratchpadAddress}, chunks::Chunk, graph::GraphEntry, - header::{try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, RecordType}, + header::{ + try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, ValidationType, + }, scratchpad::Scratchpad, }; diff --git a/docs/online-documentation/api/ant-node/README.md b/docs/online-documentation/api/ant-node/README.md index f14302a0a7..989885499b 100644 --- a/docs/online-documentation/api/ant-node/README.md +++ b/docs/online-documentation/api/ant-node/README.md @@ -123,12 +123,12 @@ The Ant Node provides a comprehensive API for running and managing nodes in the === "Rust" ```rust - use ant_protocol::storage::RecordType; + use ant_protocol::storage::ValidationType; // Store data let key = "0123456789abcdef"; // Hex string let value = b"Hello, World!"; - node.store_record(key, value, RecordType::Chunk)?; + node.store_record(key, value, ValidationType::Chunk)?; // Retrieve data let data = node.get_record(key)?; @@ -230,7 +230,7 @@ The Ant Node provides a comprehensive API for running and managing nodes in the ```rust use ant_node::error::Error; - match node.store_record(key, value, RecordType::Chunk) { + match node.store_record(key, value, ValidationType::Chunk) { Ok(_) => println!("Record stored successfully"), Err(Error::StorageFull) => println!("Storage is full"), Err(Error::InvalidKey) => println!("Invalid key format"), diff --git a/docs/online-documentation/api/ant-node/network.md b/docs/online-documentation/api/ant-node/network.md index 214d9cdc34..57bfc82809 100644 --- a/docs/online-documentation/api/ant-node/network.md +++ b/docs/online-documentation/api/ant-node/network.md @@ -172,12 +172,12 @@ This page documents the network operations available in the Ant Node API. === "Rust" ```rust - use ant_node::storage::RecordType; + use ant_node::storage::ValidationType; // Store a record let key = "0123456789abcdef"; // Hex string let value = b"Hello, World!"; - node.store_record(key, value, RecordType::Chunk)?; + node.store_record(key, value, ValidationType::Chunk)?; // Retrieve a record let data = node.get_record(key)?; From bb7ae764707430c5c31768327e8be9f2b3a2a438 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 10:28:00 +0100 Subject: [PATCH 06/13] feat: add arbitrum-sepolia-test EVM network --- evmlib/src/lib.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/evmlib/src/lib.rs b/evmlib/src/lib.rs index e2715c0ed6..bb8fff8047 100644 --- a/evmlib/src/lib.rs +++ b/evmlib/src/lib.rs @@ -27,6 +27,9 @@ pub mod testnet; pub mod utils; pub mod wallet; +/// Timeout for transactions +const TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + static PUBLIC_ARBITRUM_ONE_HTTP_RPC_URL: LazyLock = LazyLock::new(|| { "https://arb1.arbitrum.io/rpc" .parse() @@ -45,6 +48,9 @@ const ARBITRUM_ONE_PAYMENT_TOKEN_ADDRESS: Address = const ARBITRUM_SEPOLIA_PAYMENT_TOKEN_ADDRESS: Address = address!("BE1802c27C324a28aeBcd7eeC7D734246C807194"); +const ARBITRUM_SEPOLIA_TEST_PAYMENT_TOKEN_ADDRESS: Address = + address!("4bc1aCE0E66170375462cB4E6Af42Ad4D5EC689C"); + // Should be updated when the smart contract changes! const ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS: Address = address!("607483B50C5F06c25cDC316b6d1E071084EeC9f5"); @@ -52,8 +58,8 @@ const ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS: Address = const ARBITRUM_SEPOLIA_DATA_PAYMENTS_ADDRESS: Address = address!("993C7739f50899A997fEF20860554b8a28113634"); -/// Timeout for transactions -const TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); +const ARBITRUM_SEPOLIA_TEST_DATA_PAYMENTS_ADDRESS: Address = + address!("7f0842a78f7d4085d975ba91d630d680f91b1295"); #[serde_as] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -81,6 +87,7 @@ pub enum Network { #[default] ArbitrumOne, ArbitrumSepolia, + ArbitrumSepoliaTest, Custom(CustomNetwork), } @@ -89,6 +96,7 @@ impl std::fmt::Display for Network { match self { Network::ArbitrumOne => write!(f, "evm-arbitrum-one"), Network::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"), + Network::ArbitrumSepoliaTest => write!(f, "evm-arbitrum-sepolia-test"), Network::Custom(_) => write!(f, "evm-custom"), } } @@ -107,6 +115,7 @@ impl Network { match self { Network::ArbitrumOne => "arbitrum-one", Network::ArbitrumSepolia => "arbitrum-sepolia", + Network::ArbitrumSepoliaTest => "arbitrum-sepolia-test", Network::Custom(_) => "custom", } } @@ -115,6 +124,7 @@ impl Network { match self { Network::ArbitrumOne => &PUBLIC_ARBITRUM_ONE_HTTP_RPC_URL, Network::ArbitrumSepolia => &PUBLIC_ARBITRUM_SEPOLIA_HTTP_RPC_URL, + Network::ArbitrumSepoliaTest => &PUBLIC_ARBITRUM_SEPOLIA_HTTP_RPC_URL, Network::Custom(custom) => &custom.rpc_url_http, } } @@ -123,6 +133,7 @@ impl Network { match self { Network::ArbitrumOne => &ARBITRUM_ONE_PAYMENT_TOKEN_ADDRESS, Network::ArbitrumSepolia => &ARBITRUM_SEPOLIA_PAYMENT_TOKEN_ADDRESS, + Network::ArbitrumSepoliaTest => &ARBITRUM_SEPOLIA_TEST_PAYMENT_TOKEN_ADDRESS, Network::Custom(custom) => &custom.payment_token_address, } } @@ -131,6 +142,7 @@ impl Network { match self { Network::ArbitrumOne => &ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS, Network::ArbitrumSepolia => &ARBITRUM_SEPOLIA_DATA_PAYMENTS_ADDRESS, + Network::ArbitrumSepoliaTest => &ARBITRUM_SEPOLIA_TEST_DATA_PAYMENTS_ADDRESS, Network::Custom(custom) => &custom.data_payments_address, } } From 3c97c91e9c6466480745c5713a088bd2f47e1e39 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 12:19:32 +0100 Subject: [PATCH 07/13] test: add quoting test and rename existing tests --- evmlib/tests/payment_vault.rs | 78 +++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/evmlib/tests/payment_vault.rs b/evmlib/tests/payment_vault.rs index 41c5881cbb..b9437c6f6d 100644 --- a/evmlib/tests/payment_vault.rs +++ b/evmlib/tests/payment_vault.rs @@ -116,8 +116,8 @@ async fn test_deploy() { } #[tokio::test] -async fn test_proxy_reachable() { - let network = Network::ArbitrumOne; +async fn test_proxy_reachable_on_arb_sepolia() { + let network = Network::ArbitrumSepolia; let provider = http_provider(network.rpc_url().clone()); let payment_vault = PaymentVaultHandler::new(*network.data_payments_address(), provider); @@ -130,12 +130,38 @@ async fn test_proxy_reachable() { } #[tokio::test] -async fn test_verify_payment() { +async fn test_get_quote_on_arb_sepolia_test() { + let network = Network::ArbitrumSepoliaTest; + let provider = http_provider(network.rpc_url().clone()); + let payment_vault = PaymentVaultHandler::new(*network.data_payments_address(), provider); + + let quoting_metrics = QuotingMetrics { + close_records_stored: 10, + max_records: 16 * 1024, + received_payment_count: 0, + live_time: 1400, + network_density: Some([ + 4, 4, 224, 228, 247, 252, 14, 44, 67, 21, 153, 47, 244, 18, 232, 1, 152, 195, 44, 43, + 29, 135, 19, 217, 240, 129, 64, 245, 240, 227, 129, 162, + ]), + network_size: Some(240), + }; + + let amount = payment_vault + .get_quote(vec![quoting_metrics]) + .await + .unwrap(); + + assert_eq!(amount, vec![Amount::from(610678225049958_u64)]); +} + +#[tokio::test] +async fn test_pay_for_quotes_on_local() { let (_anvil, network_token, mut payment_vault) = setup().await; let mut quote_payments = vec![]; - for _ in 0..5 { + for _ in 0..MAX_TRANSFERS_PER_TRANSACTION { let quote_payment = random_quote_payment(); quote_payments.push(quote_payment); } @@ -149,36 +175,18 @@ async fn test_verify_payment() { // so we set it to the same as the network token contract payment_vault.set_provider(network_token.contract.provider().clone()); - let result = payment_vault.pay_for_quotes(quote_payments.clone()).await; + let result = payment_vault.pay_for_quotes(quote_payments).await; assert!(result.is_ok(), "Failed with error: {:?}", result.err()); - - let payment_verifications: Vec<_> = quote_payments - .into_iter() - .map(|v| interface::IPaymentVault::PaymentVerification { - metrics: QuotingMetrics::default().into(), - rewardsAddress: v.1, - quoteHash: v.0, - }) - .collect(); - - let results = payment_vault - .verify_payment(payment_verifications) - .await - .expect("Verify payment failed"); - - for result in results { - assert!(result.isValid); - } } #[tokio::test] -async fn test_pay_for_quotes() { +async fn test_verify_payment_on_local() { let (_anvil, network_token, mut payment_vault) = setup().await; let mut quote_payments = vec![]; - for _ in 0..MAX_TRANSFERS_PER_TRANSACTION { + for _ in 0..5 { let quote_payment = random_quote_payment(); quote_payments.push(quote_payment); } @@ -192,7 +200,25 @@ async fn test_pay_for_quotes() { // so we set it to the same as the network token contract payment_vault.set_provider(network_token.contract.provider().clone()); - let result = payment_vault.pay_for_quotes(quote_payments).await; + let result = payment_vault.pay_for_quotes(quote_payments.clone()).await; assert!(result.is_ok(), "Failed with error: {:?}", result.err()); + + let payment_verifications: Vec<_> = quote_payments + .into_iter() + .map(|v| interface::IPaymentVault::PaymentVerification { + metrics: QuotingMetrics::default().into(), + rewardsAddress: v.1, + quoteHash: v.0, + }) + .collect(); + + let results = payment_vault + .verify_payment(payment_verifications) + .await + .expect("Verify payment failed"); + + for result in results { + assert!(result.isValid); + } } From 0a7422e2bd2d701a91729999bdefdfe68b0d39b6 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 13:06:44 +0100 Subject: [PATCH 08/13] feat: add arb sepolia test network subcommand to node and manager --- ant-node-manager/src/bin/cli/subcommands/evm_network.rs | 4 ++++ ant-node/src/bin/antnode/subcommands.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ant-node-manager/src/bin/cli/subcommands/evm_network.rs b/ant-node-manager/src/bin/cli/subcommands/evm_network.rs index 2d795846cf..868c33aea2 100644 --- a/ant-node-manager/src/bin/cli/subcommands/evm_network.rs +++ b/ant-node-manager/src/bin/cli/subcommands/evm_network.rs @@ -19,6 +19,9 @@ pub enum EvmNetworkCommand { /// Use the Arbitrum Sepolia network EvmArbitrumSepolia, + /// Use the Arbitrum Sepolia network with test contracts + EvmArbitrumSepoliaTest, + /// Use a custom network EvmCustom { /// The RPC URL for the custom network @@ -45,6 +48,7 @@ impl TryInto for EvmNetworkCommand { match self { Self::EvmArbitrumOne => Ok(EvmNetwork::ArbitrumOne), Self::EvmArbitrumSepolia => Ok(EvmNetwork::ArbitrumSepolia), + Self::EvmArbitrumSepoliaTest => Ok(EvmNetwork::ArbitrumSepoliaTest), Self::EvmLocal => { if !cfg!(feature = "local") { return Err(color_eyre::eyre::eyre!( diff --git a/ant-node/src/bin/antnode/subcommands.rs b/ant-node/src/bin/antnode/subcommands.rs index a9e02d2be4..52c48f1ea7 100644 --- a/ant-node/src/bin/antnode/subcommands.rs +++ b/ant-node/src/bin/antnode/subcommands.rs @@ -10,6 +10,9 @@ pub(crate) enum EvmNetworkCommand { /// Use the Arbitrum Sepolia network EvmArbitrumSepolia, + /// Use the Arbitrum Sepolia network with test contracts + EvmArbitrumSepoliaTest, + /// Use a custom network EvmCustom { /// The RPC URL for the custom network @@ -32,6 +35,7 @@ impl Into for EvmNetworkCommand { match self { Self::EvmArbitrumOne => EvmNetwork::ArbitrumOne, Self::EvmArbitrumSepolia => EvmNetwork::ArbitrumSepolia, + Self::EvmArbitrumSepoliaTest => EvmNetwork::ArbitrumSepoliaTest, Self::EvmCustom { rpc_url, payment_token_address, From 8da56e549848feb6e35196f02e46f5c71eb537f5 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 13:41:27 +0100 Subject: [PATCH 09/13] feat: add arb sepolia test network to evmlib util --- evmlib/src/utils.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/evmlib/src/utils.rs b/evmlib/src/utils.rs index 4e3133713f..8e679d95d7 100644 --- a/evmlib/src/utils.rs +++ b/evmlib/src/utils.rs @@ -112,12 +112,19 @@ pub fn get_evm_network_from_env() -> Result { .map(|v| v == "arbitrum-sepolia") .unwrap_or(false); + let use_arbitrum_sepolia_test = std::env::var("EVM_NETWORK") + .map(|v| v == "arbitrum-sepolia-test") + .unwrap_or(false); + if use_arbitrum_one { info!("Using Arbitrum One EVM network as EVM_NETWORK is set to 'arbitrum-one'"); Ok(Network::ArbitrumOne) } else if use_arbitrum_sepolia { info!("Using Arbitrum Sepolia EVM network as EVM_NETWORK is set to 'arbitrum-sepolia'"); Ok(Network::ArbitrumSepolia) + } else if use_arbitrum_sepolia_test { + info!("Using Arbitrum Sepolia Test EVM network as EVM_NETWORK is set to 'arbitrum-sepolia-test'"); + Ok(Network::ArbitrumSepoliaTest) } else if let Ok(evm_vars) = evm_vars { info!("Using custom EVM network from environment variables"); Ok(Network::Custom(CustomNetwork::new( From ef35c1b21a9bb72f39f2acc280f2b24842bdd65c Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 14:09:05 +0100 Subject: [PATCH 10/13] chore: add more logging to the fetch store quotes fn --- autonomi/src/client/quote.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/autonomi/src/client/quote.rs b/autonomi/src/client/quote.rs index ca8c515ad4..b89e1bbf34 100644 --- a/autonomi/src/client/quote.rs +++ b/autonomi/src/client/quote.rs @@ -62,6 +62,7 @@ impl Client { .into_iter() .map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr)) .collect(); + let raw_quotes_per_addr = futures::future::try_join_all(futures).await?; // choose the quotes to pay for each address @@ -70,9 +71,15 @@ impl Client { let mut rate_limiter = RateLimiter::new(); for (content_addr, raw_quotes) in raw_quotes_per_addr { + debug!( + "fetching market price for content_addr: {content_addr}, with {} quotes.", + raw_quotes.len() + ); + // FIXME: find better way to deal with paid content addrs and feedback to the user // assume that content addr is already paid for and uploaded if raw_quotes.is_empty() { + debug!("content_addr: {content_addr} is already paid for. No need to fetch market price."); continue; } @@ -90,6 +97,8 @@ impl Client { ) .await?; + debug!("market prices: {all_prices:?}"); + let mut prices: Vec<(PeerId, PaymentQuote, Amount)> = all_prices .into_iter() .zip(raw_quotes.into_iter()) From e837c8e683e967663f9d249d3fbfbe92cae4a4b9 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 7 Jan 2025 16:17:25 +0100 Subject: [PATCH 11/13] chore: set default evm network to Arbitrum Sepolia --- evmlib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/evmlib/src/lib.rs b/evmlib/src/lib.rs index e2715c0ed6..480ac8270b 100644 --- a/evmlib/src/lib.rs +++ b/evmlib/src/lib.rs @@ -78,8 +78,8 @@ impl CustomNetwork { #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub enum Network { - #[default] ArbitrumOne, + #[default] ArbitrumSepolia, Custom(CustomNetwork), } From 22123636f2509a4177f74e5202dc9d1a6f176563 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 7 Jan 2025 16:18:30 +0100 Subject: [PATCH 12/13] feat: add `evm_network` field to `ClientConfig` --- autonomi/src/client/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index b6ddcfbbb9..c60424735a 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -82,6 +82,9 @@ pub struct ClientConfig { /// /// If not provided, the client will use the default bootstrap peers. pub peers: Option>, + + /// EVM network to use for quotations and payments. + pub evm_network: EvmNetwork, } impl Default for ClientConfig { @@ -92,6 +95,7 @@ impl Default for ClientConfig { #[cfg(not(feature = "local"))] local: false, peers: None, + evm_network: Default::default(), } } } @@ -151,6 +155,7 @@ impl Client { Self::init_with_config(ClientConfig { local, peers: Some(peers), + evm_network: Default::default(), }) .await } From efa057c58c9a5e5f48c80b6e29e42e021baf2f77 Mon Sep 17 00:00:00 2001 From: qima Date: Wed, 8 Jan 2025 00:08:05 +0800 Subject: [PATCH 13/13] chore: refactor RecordKind, creating DataTypes --- Cargo.lock | 1 + ant-networking/src/cmd.rs | 15 +-- ant-networking/src/event/kad.rs | 4 +- ant-networking/src/graph.rs | 8 +- ant-networking/src/lib.rs | 60 ++++++------ ant-networking/src/record_store.rs | 25 ++--- ant-node/src/metrics.rs | 18 ++-- ant-node/src/put_validation.rs | 67 ++++++++------ ant-protocol/Cargo.toml | 1 + ant-protocol/src/storage/header.rs | 141 +++++++++++++++++------------ ant-protocol/src/storage/mod.rs | 3 +- autonomi/src/client/data/public.rs | 6 +- autonomi/src/client/graph.rs | 11 ++- autonomi/src/client/pointer.rs | 13 ++- autonomi/src/client/registers.rs | 13 ++- autonomi/src/client/utils.rs | 13 ++- autonomi/src/client/vault.rs | 21 +++-- 17 files changed, 229 insertions(+), 191 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f220d37552..1af17e51ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1078,6 +1078,7 @@ dependencies = [ "hex", "lazy_static", "libp2p", + "prometheus-client", "prost 0.9.0", "rand 0.8.5", "rmp-serde", diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index c5191cda41..1475f97740 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, messages::{Cmd, Request, Response}, - storage::{RecordHeader, RecordKind, ValidationType}, + storage::{DataTypes, RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ @@ -661,19 +661,12 @@ impl SwarmDriver { let record_type = match RecordHeader::from_record(&record) { Ok(record_header) => { match record_header.kind { - RecordKind::Chunk => ValidationType::Chunk, - RecordKind::GraphEntry - | RecordKind::Pointer - | RecordKind::Register - | RecordKind::Scratchpad => { + RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk, + RecordKind::DataOnly(_) => { let content_hash = XorName::from_content(&record.value); ValidationType::NonChunk(content_hash) } - RecordKind::ChunkWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::PointerWithPayment - | RecordKind::GraphEntryWithPayment - | RecordKind::ScratchpadWithPayment => { + RecordKind::DataWithPayment(_) => { error!("Record {record_key:?} with payment shall not be stored locally."); return Err(NetworkError::InCorrectRecordHeader); } diff --git a/ant-networking/src/event/kad.rs b/ant-networking/src/event/kad.rs index 6dcf286cdf..8cd0735fcc 100644 --- a/ant-networking/src/event/kad.rs +++ b/ant-networking/src/event/kad.rs @@ -11,7 +11,7 @@ use crate::{ GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE, }; use ant_protocol::{ - storage::{try_serialize_record, GraphEntry, RecordKind}, + storage::{try_serialize_record, DataTypes, GraphEntry, RecordKind}, NetworkAddress, PrettyPrintRecordKey, }; use itertools::Itertools; @@ -415,7 +415,7 @@ impl SwarmDriver { let bytes = try_serialize_record( &accumulated_transactions, - RecordKind::GraphEntry, + RecordKind::DataOnly(DataTypes::GraphEntry), )?; let new_accumulated_record = Record { diff --git a/ant-networking/src/graph.rs b/ant-networking/src/graph.rs index d58e77599c..d38c56de03 100644 --- a/ant-networking/src/graph.rs +++ b/ant-networking/src/graph.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{driver::GetRecordCfg, Network, NetworkError, Result}; -use ant_protocol::storage::{GraphEntry, GraphEntryAddress}; +use ant_protocol::storage::{DataTypes, GraphEntry, GraphEntryAddress}; use ant_protocol::{ storage::{try_deserialize_record, RecordHeader, RecordKind, RetryStrategy}, NetworkAddress, PrettyPrintRecordKey, @@ -37,7 +37,7 @@ impl Network { pub fn get_graph_entry_from_record(record: &Record) -> Result> { let header = RecordHeader::from_record(record)?; - if let RecordKind::GraphEntry = header.kind { + if let RecordKind::DataOnly(DataTypes::GraphEntry) = header.kind { let transactions = try_deserialize_record::>(record)?; Ok(transactions) } else { @@ -45,6 +45,8 @@ pub fn get_graph_entry_from_record(record: &Record) -> Result> { "RecordKind mismatch while trying to retrieve graph_entry from record {:?}", PrettyPrintRecordKey::from(&record.key) ); - Err(NetworkError::RecordKindMismatch(RecordKind::GraphEntry)) + Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly( + DataTypes::GraphEntry, + ))) } } diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index cb4f761655..413c7eb730 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics}; use ant_protocol::{ error::Error as ProtocolError, messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response}, - storage::{Pointer, RetryStrategy, Scratchpad, ValidationType}, + storage::{DataTypes, Pointer, RetryStrategy, Scratchpad, ValidationType}, NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use futures::future::select_all; @@ -632,16 +632,11 @@ impl Network { } match kind { - RecordKind::Chunk - | RecordKind::ChunkWithPayment - | RecordKind::GraphEntryWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::PointerWithPayment - | RecordKind::ScratchpadWithPayment => { + RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => { error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping."); continue; } - RecordKind::GraphEntry => { + RecordKind::DataOnly(DataTypes::GraphEntry) => { info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions"); match get_graph_entry_from_record(record) { @@ -653,7 +648,7 @@ impl Network { } } } - RecordKind::Register => { + RecordKind::DataOnly(DataTypes::Register) => { info!("For record {pretty_key:?}, we have a split record for a register. Accumulating registers"); let Ok(register) = try_deserialize_record::(record) else { error!( @@ -675,7 +670,7 @@ impl Network { } } } - RecordKind::Pointer => { + RecordKind::DataOnly(DataTypes::Pointer) => { info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count"); let Ok(pointer) = try_deserialize_record::(record) else { error!( @@ -697,7 +692,7 @@ impl Network { } valid_pointer = Some(pointer); } - RecordKind::Scratchpad => { + RecordKind::DataOnly(DataTypes::Scratchpad) => { info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count"); let Ok(scratchpad) = try_deserialize_record::(record) else { error!( @@ -733,7 +728,7 @@ impl Network { .collect::>(); let record = Record { key: key.clone(), - value: try_serialize_record(&accumulated_transactions, RecordKind::GraphEntry) + value: try_serialize_record(&accumulated_transactions, RecordKind::DataOnly(DataTypes::GraphEntry)) .map_err(|err| { error!( "Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}" @@ -754,14 +749,15 @@ impl Network { acc }); - let record_value = try_serialize_record(&signed_register, RecordKind::Register) - .map_err(|err| { - error!( + let record_value = + try_serialize_record(&signed_register, RecordKind::DataOnly(DataTypes::Register)) + .map_err(|err| { + error!( "Error while serializing the merged register for {pretty_key:?}: {err:?}" ); - NetworkError::from(err) - })? - .to_vec(); + NetworkError::from(err) + })? + .to_vec(); let record = Record { key: key.clone(), @@ -772,12 +768,13 @@ impl Network { return Ok(Some(record)); } else if let Some(pointer) = valid_pointer { info!("For record {pretty_key:?} task found a valid pointer, returning it."); - let record_value = try_serialize_record(&pointer, RecordKind::Pointer) - .map_err(|err| { - error!("Error while serializing the pointer for {pretty_key:?}: {err:?}"); - NetworkError::from(err) - })? - .to_vec(); + let record_value = + try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer)) + .map_err(|err| { + error!("Error while serializing the pointer for {pretty_key:?}: {err:?}"); + NetworkError::from(err) + })? + .to_vec(); let record = Record { key: key.clone(), @@ -788,12 +785,15 @@ impl Network { return Ok(Some(record)); } else if let Some(scratchpad) = valid_scratchpad { info!("For record {pretty_key:?} task found a valid scratchpad, returning it."); - let record_value = try_serialize_record(&scratchpad, RecordKind::Scratchpad) - .map_err(|err| { - error!("Error while serializing the scratchpad for {pretty_key:?}: {err:?}"); - NetworkError::from(err) - })? - .to_vec(); + let record_value = + try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad)) + .map_err(|err| { + error!( + "Error while serializing the scratchpad for {pretty_key:?}: {err:?}" + ); + NetworkError::from(err) + })? + .to_vec(); let record = Record { key: key.clone(), diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index e9e1d2886c..ef32b98381 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -820,19 +820,11 @@ impl RecordStore for NodeRecordStore { match RecordHeader::from_record(&record) { Ok(record_header) => { match record_header.kind { - RecordKind::ChunkWithPayment - | RecordKind::GraphEntryWithPayment - | RecordKind::PointerWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::ScratchpadWithPayment => { + RecordKind::DataWithPayment(_) => { debug!("Record {record_key:?} with payment shall always be processed."); } // Shall not use wildcard, to avoid mis-match during enum update. - RecordKind::Chunk - | RecordKind::GraphEntry - | RecordKind::Pointer - | RecordKind::Register - | RecordKind::Scratchpad => { + RecordKind::DataOnly(_) => { // Chunk with existing key do not to be stored again. // Others with same content_hash do not to be stored again, // otherwise shall be passed further to allow different version of nonchunk @@ -1003,7 +995,7 @@ mod tests { use ant_protocol::convert_distance_to_u256; use ant_protocol::storage::{ - try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, Scratchpad, + try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, DataTypes, Scratchpad, }; use assert_fs::{ fixture::{PathChild, PathCreateDir}, @@ -1036,7 +1028,7 @@ mod tests { fn arbitrary(g: &mut Gen) -> ArbitraryRecord { let value = match try_serialize_record( &(0..50).map(|_| rand::random::()).collect::(), - RecordKind::Chunk, + RecordKind::DataOnly(DataTypes::Chunk), ) { Ok(value) => value.to_vec(), Err(err) => panic!("Cannot generate record value {err:?}"), @@ -1162,7 +1154,7 @@ mod tests { // Create a record from the chunk let record = Record { key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(), - value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(), + value: try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk))?.to_vec(), expires: None, publisher: None, }; @@ -1334,7 +1326,8 @@ mod tests { // Create a record from the scratchpad let record = Record { key: NetworkAddress::ScratchpadAddress(scratchpad_address).to_record_key(), - value: try_serialize_record(&scratchpad, RecordKind::Scratchpad)?.to_vec(), + value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))? + .to_vec(), expires: None, publisher: None, }; @@ -1424,7 +1417,7 @@ mod tests { let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key(); let value = match try_serialize_record( &(0..50).map(|_| rand::random::()).collect::(), - RecordKind::Chunk, + RecordKind::DataOnly(DataTypes::Chunk), ) { Ok(value) => value.to_vec(), Err(err) => panic!("Cannot generate record value {err:?}"), @@ -1547,7 +1540,7 @@ mod tests { &(0..max_records) .map(|_| rand::random::()) .collect::(), - RecordKind::Chunk, + RecordKind::DataOnly(DataTypes::Chunk), ) { Ok(value) => value.to_vec(), Err(err) => panic!("Cannot generate record value {err:?}"), diff --git a/ant-node/src/metrics.rs b/ant-node/src/metrics.rs index 53c7641db1..f441742570 100644 --- a/ant-node/src/metrics.rs +++ b/ant-node/src/metrics.rs @@ -10,8 +10,9 @@ use crate::Marker; use ant_networking::time::Instant; #[cfg(feature = "open-metrics")] use ant_networking::MetricsRegistries; +use ant_protocol::storage::DataTypes; use prometheus_client::{ - encoding::{EncodeLabelSet, EncodeLabelValue}, + encoding::EncodeLabelSet, metrics::{ counter::Counter, family::Family, @@ -47,14 +48,7 @@ pub(crate) struct NodeMetricsRecorder { #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] struct PutRecordOk { - record_type: RecordType, -} - -#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)] -enum RecordType { - Chunk, - Register, - Spend, + record_type: DataTypes, } impl NodeMetricsRecorder { @@ -157,7 +151,7 @@ impl NodeMetricsRecorder { let _ = self .put_record_ok .get_or_create(&PutRecordOk { - record_type: RecordType::Chunk, + record_type: DataTypes::Chunk, }) .inc(); } @@ -166,7 +160,7 @@ impl NodeMetricsRecorder { let _ = self .put_record_ok .get_or_create(&PutRecordOk { - record_type: RecordType::Register, + record_type: DataTypes::Register, }) .inc(); } @@ -175,7 +169,7 @@ impl NodeMetricsRecorder { let _ = self .put_record_ok .get_or_create(&PutRecordOk { - record_type: RecordType::Spend, + record_type: DataTypes::GraphEntry, }) .inc(); } diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 0925c3d1f6..074d9f6ab3 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -15,7 +15,7 @@ use ant_networking::NetworkError; use ant_protocol::storage::GraphEntry; use ant_protocol::{ storage::{ - try_deserialize_record, try_serialize_record, Chunk, GraphEntryAddress, Pointer, + try_deserialize_record, try_serialize_record, Chunk, DataTypes, GraphEntryAddress, Pointer, RecordHeader, RecordKind, Scratchpad, ValidationType, }, NetworkAddress, PrettyPrintRecordKey, @@ -30,7 +30,7 @@ impl Node { let record_header = RecordHeader::from_record(&record)?; match record_header.kind { - RecordKind::ChunkWithPayment => { + RecordKind::DataWithPayment(DataTypes::Chunk) => { let record_key = record.key.clone(); let (payment, chunk) = try_deserialize_record::<(ProofOfPayment, Chunk)>(&record)?; let already_exists = self @@ -87,13 +87,13 @@ impl Node { store_chunk_result } - RecordKind::Chunk => { + RecordKind::DataOnly(DataTypes::Chunk) => { error!("Chunk should not be validated at this point"); Err(Error::InvalidPutWithoutPayment( PrettyPrintRecordKey::from(&record.key).into_owned(), )) } - RecordKind::ScratchpadWithPayment => { + RecordKind::DataWithPayment(DataTypes::Scratchpad) => { let record_key = record.key.clone(); let (payment, scratchpad) = try_deserialize_record::<(ProofOfPayment, Scratchpad)>(&record)?; @@ -148,7 +148,7 @@ impl Node { store_scratchpad_result } - RecordKind::Scratchpad => { + RecordKind::DataOnly(DataTypes::Scratchpad) => { // make sure we already have this scratchpad locally, else reject it as first time upload needs payment let key = record.key.clone(); let scratchpad = try_deserialize_record::(&record)?; @@ -166,14 +166,14 @@ impl Node { self.validate_and_store_scratchpad_record(scratchpad, key, false) .await } - RecordKind::GraphEntry => { + RecordKind::DataOnly(DataTypes::GraphEntry) => { // Transactions should always be paid for error!("Transaction should not be validated at this point"); Err(Error::InvalidPutWithoutPayment( PrettyPrintRecordKey::from(&record.key).into_owned(), )) } - RecordKind::GraphEntryWithPayment => { + RecordKind::DataWithPayment(DataTypes::GraphEntry) => { let (payment, transaction) = try_deserialize_record::<(ProofOfPayment, GraphEntry)>(&record)?; @@ -228,7 +228,7 @@ impl Node { } res } - RecordKind::Register => { + RecordKind::DataOnly(DataTypes::Register) => { let register = try_deserialize_record::(&record)?; // make sure we already have this register locally @@ -267,7 +267,7 @@ impl Node { } result } - RecordKind::RegisterWithPayment => { + RecordKind::DataWithPayment(DataTypes::Register) => { let (payment, register) = try_deserialize_record::<(ProofOfPayment, SignedRegister)>(&record)?; @@ -314,14 +314,14 @@ impl Node { } res } - RecordKind::Pointer => { + RecordKind::DataOnly(DataTypes::Pointer) => { // Pointers should always be paid for error!("Pointer should not be validated at this point"); Err(Error::InvalidPutWithoutPayment( PrettyPrintRecordKey::from(&record.key).into_owned(), )) } - RecordKind::PointerWithPayment => { + RecordKind::DataWithPayment(DataTypes::Pointer) => { let (payment, pointer) = try_deserialize_record::<(ProofOfPayment, Pointer)>(&record)?; @@ -378,18 +378,14 @@ impl Node { debug!("Storing record which was replicated to us {:?}", record.key); let record_header = RecordHeader::from_record(&record)?; match record_header.kind { - // A separate flow handles payment for chunks and registers - RecordKind::ChunkWithPayment - | RecordKind::GraphEntryWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::ScratchpadWithPayment - | RecordKind::PointerWithPayment => { + // A separate flow handles record with payment + RecordKind::DataWithPayment(_) => { warn!("Prepaid record came with Payment, which should be handled in another flow"); Err(Error::UnexpectedRecordWithPayment( PrettyPrintRecordKey::from(&record.key).into_owned(), )) } - RecordKind::Chunk => { + RecordKind::DataOnly(DataTypes::Chunk) => { let chunk = try_deserialize_record::(&record)?; let record_key = record.key.clone(); @@ -406,19 +402,19 @@ impl Node { self.store_chunk(&chunk) } - RecordKind::Scratchpad => { + RecordKind::DataOnly(DataTypes::Scratchpad) => { let key = record.key.clone(); let scratchpad = try_deserialize_record::(&record)?; self.validate_and_store_scratchpad_record(scratchpad, key, false) .await } - RecordKind::GraphEntry => { + RecordKind::DataOnly(DataTypes::GraphEntry) => { let record_key = record.key.clone(); let transactions = try_deserialize_record::>(&record)?; self.validate_merge_and_store_transactions(transactions, &record_key) .await } - RecordKind::Register => { + RecordKind::DataOnly(DataTypes::Register) => { let register = try_deserialize_record::(&record)?; // check if the deserialized value's RegisterAddress matches the record's key @@ -432,7 +428,7 @@ impl Node { } self.validate_and_store_register(register, false).await } - RecordKind::Pointer => { + RecordKind::DataOnly(DataTypes::Pointer) => { let pointer = try_deserialize_record::(&record)?; let key = record.key.clone(); self.validate_and_store_pointer_record(pointer, key) @@ -487,7 +483,7 @@ impl Node { let record = Record { key, - value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(), + value: try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk))?.to_vec(), publisher: None, expires: None, }; @@ -551,7 +547,8 @@ impl Node { let record = Record { key: scratchpad_key.clone(), - value: try_serialize_record(&scratchpad, RecordKind::Scratchpad)?.to_vec(), + value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))? + .to_vec(), publisher: None, expires: None, }; @@ -600,7 +597,11 @@ impl Node { // store in kad let record = Record { key: key.clone(), - value: try_serialize_record(&updated_register, RecordKind::Register)?.to_vec(), + value: try_serialize_record( + &updated_register, + RecordKind::DataOnly(DataTypes::Register), + )? + .to_vec(), publisher: None, expires: None, }; @@ -682,7 +683,11 @@ impl Node { // store the record into the local storage let record = Record { key: record_key.clone(), - value: try_serialize_record(&validated_transactions, RecordKind::GraphEntry)?.to_vec(), + value: try_serialize_record( + &validated_transactions, + RecordKind::DataOnly(DataTypes::GraphEntry), + )? + .to_vec(), publisher: None, expires: None, }; @@ -848,9 +853,12 @@ impl Node { // deserialize the record and get the transactions let local_header = RecordHeader::from_record(&local_record)?; let record_kind = local_header.kind; - if !matches!(record_kind, RecordKind::GraphEntry) { + if !matches!(record_kind, RecordKind::DataOnly(DataTypes::GraphEntry)) { error!("Found a {record_kind} when expecting to find Spend at {addr:?}"); - return Err(NetworkError::RecordKindMismatch(RecordKind::GraphEntry).into()); + return Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly( + DataTypes::GraphEntry, + )) + .into()); } let local_transactions: Vec = try_deserialize_record(&local_record)?; Ok(local_transactions) @@ -878,7 +886,8 @@ impl Node { // Store the pointer let record = Record { key: key.clone(), - value: try_serialize_record(&pointer, RecordKind::Pointer)?.to_vec(), + value: try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))? + .to_vec(), publisher: None, expires: None, }; diff --git a/ant-protocol/Cargo.toml b/ant-protocol/Cargo.toml index 4a4011d906..1f83cfd8fb 100644 --- a/ant-protocol/Cargo.toml +++ b/ant-protocol/Cargo.toml @@ -27,6 +27,7 @@ exponential-backoff = "2.0.0" hex = "~0.4.3" lazy_static = "1.4.0" libp2p = { version = "0.54.1", features = ["identify", "kad"] } +prometheus-client = { version = "0.22" } prost = { version = "0.9", optional = true } rand = "0.8" rmp-serde = "1.1.1" diff --git a/ant-protocol/src/storage/header.rs b/ant-protocol/src/storage/header.rs index a67274d5be..4ec619b965 100644 --- a/ant-protocol/src/storage/header.rs +++ b/ant-protocol/src/storage/header.rs @@ -10,11 +10,45 @@ use crate::error::Error; use crate::PrettyPrintRecordKey; use bytes::{BufMut, Bytes, BytesMut}; use libp2p::kad::Record; +use prometheus_client::encoding::EncodeLabelValue; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use std::fmt::Display; use xor_name::XorName; +/// Data types that natively suppported by autonomi network. +#[derive(EncodeLabelValue, Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Hash)] +pub enum DataTypes { + Chunk, + GraphEntry, + Pointer, + Register, + Scratchpad, +} + +impl DataTypes { + pub fn get_index(&self) -> u32 { + match self { + Self::Chunk => 0, + Self::GraphEntry => 1, + Self::Pointer => 2, + Self::Register => 3, + Self::Scratchpad => 4, + } + } + + pub fn from_index(index: u32) -> Option { + match index { + 0 => Some(Self::Chunk), + 1 => Some(Self::GraphEntry), + 2 => Some(Self::Pointer), + 3 => Some(Self::Register), + 4 => Some(Self::Scratchpad), + _ => None, + } + } +} + /// Indicates the type of the record content. /// This is to be only used within the node instance to reflect different content version. /// Hence, only need to have two entries: Chunk and NonChunk. @@ -29,37 +63,28 @@ pub struct RecordHeader { pub kind: RecordKind, } +/// To be used between client and nodes, hence need to indicate whehter payment info involved. #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum RecordKind { - Chunk, - ChunkWithPayment, - GraphEntry, - GraphEntryWithPayment, - Register, - RegisterWithPayment, - Scratchpad, - ScratchpadWithPayment, - Pointer, - PointerWithPayment, + DataOnly(DataTypes), + DataWithPayment(DataTypes), } +/// Allowing 10 data types to be defined, leaving margin for future. +pub const RECORD_KIND_PAYMENT_STARTING_INDEX: u32 = 10; + impl Serialize for RecordKind { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - match *self { - Self::ChunkWithPayment => serializer.serialize_u32(0), - Self::Chunk => serializer.serialize_u32(1), - Self::GraphEntry => serializer.serialize_u32(2), - Self::Register => serializer.serialize_u32(3), - Self::RegisterWithPayment => serializer.serialize_u32(4), - Self::Scratchpad => serializer.serialize_u32(5), - Self::ScratchpadWithPayment => serializer.serialize_u32(6), - Self::GraphEntryWithPayment => serializer.serialize_u32(7), - Self::Pointer => serializer.serialize_u32(8), - Self::PointerWithPayment => serializer.serialize_u32(9), - } + let index = match self { + Self::DataOnly(ref data_types) => data_types.get_index(), + Self::DataWithPayment(ref data_types) => { + RECORD_KIND_PAYMENT_STARTING_INDEX + data_types.get_index() + } + }; + serializer.serialize_u32(index) } } @@ -69,20 +94,22 @@ impl<'de> Deserialize<'de> for RecordKind { D: serde::Deserializer<'de>, { let num = u32::deserialize(deserializer)?; - match num { - 0 => Ok(Self::ChunkWithPayment), - 1 => Ok(Self::Chunk), - 2 => Ok(Self::GraphEntry), - 3 => Ok(Self::Register), - 4 => Ok(Self::RegisterWithPayment), - 5 => Ok(Self::Scratchpad), - 6 => Ok(Self::ScratchpadWithPayment), - 7 => Ok(Self::GraphEntryWithPayment), - 8 => Ok(Self::Pointer), - 9 => Ok(Self::PointerWithPayment), - _ => Err(serde::de::Error::custom( - "Unexpected integer for RecordKind variant", - )), + let data_type_index = if num < RECORD_KIND_PAYMENT_STARTING_INDEX { + num + } else { + num - RECORD_KIND_PAYMENT_STARTING_INDEX + }; + + if let Some(data_type) = DataTypes::from_index(data_type_index) { + if num < RECORD_KIND_PAYMENT_STARTING_INDEX { + Ok(Self::DataOnly(data_type)) + } else { + Ok(Self::DataWithPayment(data_type)) + } + } else { + Err(serde::de::Error::custom( + "Unexpected index {num} for RecordKind variant", + )) } } } @@ -126,7 +153,7 @@ impl RecordHeader { pub fn is_record_of_type_chunk(record: &Record) -> Result { let kind = Self::from_record(record)?.kind; - Ok(kind == RecordKind::Chunk) + Ok(kind == RecordKind::DataOnly(DataTypes::Chunk)) } } @@ -165,61 +192,61 @@ pub fn try_serialize_record( #[cfg(test)] mod tests { - use super::{RecordHeader, RecordKind}; + use super::*; use crate::error::Result; #[test] fn verify_record_header_encoded_size() -> Result<()> { let chunk_with_payment = RecordHeader { - kind: RecordKind::ChunkWithPayment, + kind: RecordKind::DataWithPayment(DataTypes::Chunk), } .try_serialize()?; assert_eq!(chunk_with_payment.len(), RecordHeader::SIZE); let reg_with_payment = RecordHeader { - kind: RecordKind::RegisterWithPayment, + kind: RecordKind::DataWithPayment(DataTypes::Register), } .try_serialize()?; assert_eq!(reg_with_payment.len(), RecordHeader::SIZE); let chunk = RecordHeader { - kind: RecordKind::Chunk, + kind: RecordKind::DataOnly(DataTypes::Chunk), } .try_serialize()?; assert_eq!(chunk.len(), RecordHeader::SIZE); let transaction = RecordHeader { - kind: RecordKind::GraphEntry, + kind: RecordKind::DataOnly(DataTypes::GraphEntry), } .try_serialize()?; assert_eq!(transaction.len(), RecordHeader::SIZE); let register = RecordHeader { - kind: RecordKind::Register, + kind: RecordKind::DataOnly(DataTypes::Register), } .try_serialize()?; assert_eq!(register.len(), RecordHeader::SIZE); let scratchpad = RecordHeader { - kind: RecordKind::Scratchpad, + kind: RecordKind::DataOnly(DataTypes::Scratchpad), } .try_serialize()?; assert_eq!(scratchpad.len(), RecordHeader::SIZE); let scratchpad_with_payment = RecordHeader { - kind: RecordKind::ScratchpadWithPayment, + kind: RecordKind::DataWithPayment(DataTypes::Scratchpad), } .try_serialize()?; assert_eq!(scratchpad_with_payment.len(), RecordHeader::SIZE); let pointer = RecordHeader { - kind: RecordKind::Pointer, + kind: RecordKind::DataOnly(DataTypes::Pointer), } .try_serialize()?; assert_eq!(pointer.len(), RecordHeader::SIZE); let pointer_with_payment = RecordHeader { - kind: RecordKind::PointerWithPayment, + kind: RecordKind::DataWithPayment(DataTypes::Pointer), } .try_serialize()?; assert_eq!(pointer_with_payment.len(), RecordHeader::SIZE); @@ -230,16 +257,16 @@ mod tests { #[test] fn test_record_kind_serialization() -> Result<()> { let kinds = vec![ - RecordKind::Chunk, - RecordKind::ChunkWithPayment, - RecordKind::GraphEntry, - RecordKind::GraphEntryWithPayment, - RecordKind::Register, - RecordKind::RegisterWithPayment, - RecordKind::Scratchpad, - RecordKind::ScratchpadWithPayment, - RecordKind::Pointer, - RecordKind::PointerWithPayment, + RecordKind::DataOnly(DataTypes::Chunk), + RecordKind::DataWithPayment(DataTypes::Chunk), + RecordKind::DataOnly(DataTypes::GraphEntry), + RecordKind::DataWithPayment(DataTypes::GraphEntry), + RecordKind::DataOnly(DataTypes::Register), + RecordKind::DataWithPayment(DataTypes::Register), + RecordKind::DataOnly(DataTypes::Scratchpad), + RecordKind::DataWithPayment(DataTypes::Scratchpad), + RecordKind::DataOnly(DataTypes::Pointer), + RecordKind::DataWithPayment(DataTypes::Pointer), ]; for kind in kinds { diff --git a/ant-protocol/src/storage/mod.rs b/ant-protocol/src/storage/mod.rs index 033aaab757..f23d9bca7e 100644 --- a/ant-protocol/src/storage/mod.rs +++ b/ant-protocol/src/storage/mod.rs @@ -23,7 +23,8 @@ pub use self::{ chunks::Chunk, graph::GraphEntry, header::{ - try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, ValidationType, + try_deserialize_record, try_serialize_record, DataTypes, RecordHeader, RecordKind, + ValidationType, }, scratchpad::Scratchpad, }; diff --git a/autonomi/src/client/data/public.rs b/autonomi/src/client/data/public.rs index eff9f99f89..8b434c5a98 100644 --- a/autonomi/src/client/data/public.rs +++ b/autonomi/src/client/data/public.rs @@ -17,7 +17,7 @@ use crate::{self_encryption::encrypt, Client}; use ant_evm::{Amount, AttoTokens}; use ant_networking::{GetRecordCfg, NetworkError}; use ant_protocol::{ - storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind}, + storage::{try_deserialize_record, Chunk, ChunkAddress, DataTypes, RecordHeader, RecordKind}, NetworkAddress, }; @@ -129,7 +129,7 @@ impl Client { .inspect_err(|err| error!("Error fetching chunk: {err:?}"))?; let header = RecordHeader::from_record(&record)?; - if let RecordKind::Chunk = header.kind { + if let Ok(true) = RecordHeader::is_record_of_type_chunk(&record) { let chunk: Chunk = try_deserialize_record(&record)?; Ok(chunk) } else { @@ -137,7 +137,7 @@ impl Client { "Record kind mismatch: expected Chunk, got {:?}", header.kind ); - Err(NetworkError::RecordKindMismatch(RecordKind::Chunk).into()) + Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly(DataTypes::Chunk)).into()) } } diff --git a/autonomi/src/client/graph.rs b/autonomi/src/client/graph.rs index 71749b3289..cf78903209 100644 --- a/autonomi/src/client/graph.rs +++ b/autonomi/src/client/graph.rs @@ -20,7 +20,7 @@ pub use bls::SecretKey; use ant_evm::{EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; use ant_protocol::{ - storage::{try_serialize_record, RecordKind, RetryStrategy}, + storage::{try_serialize_record, DataTypes, RecordKind, RetryStrategy}, NetworkAddress, }; use libp2p::kad::{Quorum, Record}; @@ -89,9 +89,12 @@ impl Client { let payees = proof.payees(); let record = Record { key: NetworkAddress::from_graph_entry_address(address).to_record_key(), - value: try_serialize_record(&(proof, &transaction), RecordKind::GraphEntryWithPayment) - .map_err(|_| GraphError::Serialization)? - .to_vec(), + value: try_serialize_record( + &(proof, &transaction), + RecordKind::DataWithPayment(DataTypes::GraphEntry), + ) + .map_err(|_| GraphError::Serialization)? + .to_vec(), publisher: None, expires: None, }; diff --git a/autonomi/src/client/pointer.rs b/autonomi/src/client/pointer.rs index dd759209f5..a5f95e18f8 100644 --- a/autonomi/src/client/pointer.rs +++ b/autonomi/src/client/pointer.rs @@ -5,7 +5,9 @@ use tracing::{debug, error, trace}; use ant_evm::{Amount, AttoTokens, EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; use ant_protocol::{ - storage::{try_serialize_record, Pointer, PointerAddress, RecordKind, RetryStrategy}, + storage::{ + try_serialize_record, DataTypes, Pointer, PointerAddress, RecordKind, RetryStrategy, + }, NetworkAddress, }; use bls::SecretKey; @@ -80,9 +82,12 @@ impl Client { let record = Record { key: NetworkAddress::from_pointer_address(address).to_record_key(), - value: try_serialize_record(&(proof, &pointer), RecordKind::PointerWithPayment) - .map_err(|_| PointerError::Serialization)? - .to_vec(), + value: try_serialize_record( + &(proof, &pointer), + RecordKind::DataWithPayment(DataTypes::Pointer), + ) + .map_err(|_| PointerError::Serialization)? + .to_vec(), publisher: None, expires: None, }; diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index dc56e37b45..d28fbacf2f 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -19,7 +19,7 @@ pub use bls::SecretKey as RegisterSecretKey; use ant_evm::{Amount, AttoTokens, EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg, VerificationKind}; use ant_protocol::{ - storage::{try_deserialize_record, try_serialize_record, RecordKind, RetryStrategy}, + storage::{try_deserialize_record, try_serialize_record, DataTypes, RecordKind, RetryStrategy}, NetworkAddress, }; use ant_registers::Register as BaseRegister; @@ -204,9 +204,12 @@ impl Client { // Prepare the record for network storage let record = Record { key: NetworkAddress::from_register_address(*register.address()).to_record_key(), - value: try_serialize_record(&signed_register, RecordKind::Register) - .map_err(|_| RegisterError::Serialization)? - .to_vec(), + value: try_serialize_record( + &signed_register, + RecordKind::DataOnly(DataTypes::Register), + ) + .map_err(|_| RegisterError::Serialization)? + .to_vec(), publisher: None, expires: None, }; @@ -337,7 +340,7 @@ impl Client { key: NetworkAddress::from_register_address(*address).to_record_key(), value: try_serialize_record( &(proof, &signed_register), - RecordKind::RegisterWithPayment, + RecordKind::DataWithPayment(DataTypes::Register), ) .map_err(|_| RegisterError::Serialization)? .to_vec(), diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index ad2aeececb..2a8eb70e3e 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -11,7 +11,7 @@ use ant_evm::{EvmWallet, ProofOfPayment}; use ant_networking::{GetRecordCfg, PutRecordCfg, VerificationKind}; use ant_protocol::{ messages::ChunkProof, - storage::{try_serialize_record, Chunk, RecordKind, RetryStrategy}, + storage::{try_serialize_record, Chunk, DataTypes, RecordKind, RetryStrategy}, }; use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; @@ -110,7 +110,7 @@ impl Client { let key = chunk.network_address().to_record_key(); - let record_kind = RecordKind::ChunkWithPayment; + let record_kind = RecordKind::DataWithPayment(DataTypes::Chunk); let record = Record { key: key.clone(), value: try_serialize_record(&(payment, chunk.clone()), record_kind) @@ -133,9 +133,12 @@ impl Client { is_register: false, }; - let stored_on_node = try_serialize_record(&chunk, RecordKind::Chunk) - .map_err(|e| PutError::Serialization(format!("Failed to serialize chunk: {e:?}")))? - .to_vec(); + let stored_on_node = + try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk)) + .map_err(|e| { + PutError::Serialization(format!("Failed to serialize chunk: {e:?}")) + })? + .to_vec(); let random_nonce = thread_rng().gen::(); let expected_proof = ChunkProof::new(&stored_on_node, random_nonce); diff --git a/autonomi/src/client/vault.rs b/autonomi/src/client/vault.rs index 462e2a4cb0..d57c197fc6 100644 --- a/autonomi/src/client/vault.rs +++ b/autonomi/src/client/vault.rs @@ -19,7 +19,7 @@ use crate::client::Client; use ant_evm::{Amount, AttoTokens}; use ant_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg, VerificationKind}; use ant_protocol::storage::{ - try_serialize_record, RecordKind, RetryStrategy, Scratchpad, ScratchpadAddress, + try_serialize_record, DataTypes, RecordKind, RetryStrategy, Scratchpad, ScratchpadAddress, }; use ant_protocol::Bytes; use ant_protocol::{storage::try_deserialize_record, NetworkAddress}; @@ -208,20 +208,23 @@ impl Client { Record { key: scratch_key, - value: try_serialize_record(&(proof, scratch), RecordKind::ScratchpadWithPayment) - .map_err(|_| { - PutError::Serialization( - "Failed to serialize scratchpad with payment".to_string(), - ) - })? - .to_vec(), + value: try_serialize_record( + &(proof, scratch), + RecordKind::DataWithPayment(DataTypes::Scratchpad), + ) + .map_err(|_| { + PutError::Serialization( + "Failed to serialize scratchpad with payment".to_string(), + ) + })? + .to_vec(), publisher: None, expires: None, } } else { Record { key: scratch_key, - value: try_serialize_record(&scratch, RecordKind::Scratchpad) + value: try_serialize_record(&scratch, RecordKind::DataOnly(DataTypes::Scratchpad)) .map_err(|_| { PutError::Serialization("Failed to serialize scratchpad".to_string()) })?