diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index 1475f97740..b56febac14 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -103,6 +103,8 @@ pub enum LocalSwarmCmd { /// Returns the quoting metrics and whether the record at `key` is already stored locally GetLocalQuotingMetrics { key: RecordKey, + data_type: u32, + data_size: usize, sender: oneshot::Sender<(QuotingMetrics, bool)>, }, /// Notify the node received a payment. @@ -575,7 +577,12 @@ impl SwarmDriver { cmd_string = "TriggerIntervalReplication"; self.try_interval_replication()?; } - LocalSwarmCmd::GetLocalQuotingMetrics { key, sender } => { + LocalSwarmCmd::GetLocalQuotingMetrics { + key, + data_type, + data_size, + sender, + } => { cmd_string = "GetLocalQuotingMetrics"; let ( _index, @@ -591,7 +598,12 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .quoting_metrics(&key, Some(estimated_network_size as u64)); + .quoting_metrics( + &key, + data_type, + data_size, + Some(estimated_network_size as u64), + ); self.record_metrics(Marker::QuotingMetrics { quoting_metrics: "ing_metrics, diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index 2d558fd9f6..c3184156ed 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -378,6 +378,8 @@ impl Network { pub async fn get_store_quote_from_network( &self, record_address: NetworkAddress, + data_type: u32, + data_size: usize, ignore_peers: Vec, ) -> Result> { // The requirement of having at least CLOSE_GROUP_SIZE @@ -400,6 +402,8 @@ impl Network { // Client shall decide whether to carry out storage verification or not. let request = Request::Query(Query::GetStoreQuote { key: record_address.clone(), + data_type, + data_size, nonce: None, difficulty: 0, }); @@ -810,9 +814,16 @@ impl Network { pub async fn get_local_quoting_metrics( &self, key: RecordKey, + data_type: u32, + data_size: usize, ) -> Result<(QuotingMetrics, bool)> { let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics { key, sender }); + self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics { + key, + data_type, + data_size, + sender, + }); receiver .await diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index ef32b98381..91dd8e2c43 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -714,6 +714,8 @@ impl NodeRecordStore { pub(crate) fn quoting_metrics( &self, key: &Key, + data_type: u32, + data_size: usize, network_size: Option, ) -> (QuotingMetrics, bool) { let records_stored = self.records.len(); @@ -725,6 +727,8 @@ impl NodeRecordStore { }; let mut quoting_metrics = QuotingMetrics { + data_type, + data_size, close_records_stored: records_stored, max_records: self.config.max_records, received_payment_count: self.received_payment_count, diff --git a/ant-networking/src/record_store_api.rs b/ant-networking/src/record_store_api.rs index 7db4f38e54..755571800c 100644 --- a/ant-networking/src/record_store_api.rs +++ b/ant-networking/src/record_store_api.rs @@ -118,6 +118,8 @@ impl UnifiedRecordStore { pub(crate) fn quoting_metrics( &self, key: &RecordKey, + data_type: u32, + data_size: usize, network_size: Option, ) -> (QuotingMetrics, bool) { match self { @@ -125,7 +127,7 @@ impl UnifiedRecordStore { warn!("Calling quoting metrics calculation at Client. This should not happen"); Default::default() } - Self::Node(store) => store.quoting_metrics(key, network_size), + Self::Node(store) => store.quoting_metrics(key, data_type, data_size, network_size), } } diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index ba83779c84..81395821f4 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -574,6 +574,8 @@ impl Node { let resp: QueryResponse = match query { Query::GetStoreQuote { key, + data_type, + data_size, nonce, difficulty, } => { @@ -581,8 +583,9 @@ impl Node { let record_key = key.to_record_key(); let self_id = network.peer_id(); - let maybe_quoting_metrics = - network.get_local_quoting_metrics(record_key.clone()).await; + let maybe_quoting_metrics = network + .get_local_quoting_metrics(record_key.clone(), data_type, data_size) + .await; let storage_proofs = if let Some(nonce) = nonce { Self::respond_x_closest_record_proof( diff --git a/ant-protocol/src/messages/query.rs b/ant-protocol/src/messages/query.rs index d395274037..196cff9b7b 100644 --- a/ant-protocol/src/messages/query.rs +++ b/ant-protocol/src/messages/query.rs @@ -23,6 +23,10 @@ pub enum Query { GetStoreQuote { /// The Address of the record to be stored. key: NetworkAddress, + /// DataTypes as represented as its `index` + data_type: u32, + /// Data size of the record + data_size: usize, /// The random nonce that nodes use to produce the Proof (i.e., hash(record+nonce)) /// Set to None if no need to carry out storage check. nonce: Option, @@ -101,10 +105,15 @@ impl std::fmt::Display for Query { match self { Query::GetStoreQuote { key, + data_type, + data_size, nonce, difficulty, } => { - write!(f, "Query::GetStoreQuote({key:?} {nonce:?} {difficulty})") + write!( + f, + "Query::GetStoreQuote({key:?} {data_type} {data_size} {nonce:?} {difficulty})" + ) } Query::GetReplicatedRecord { key, requester } => { write!(f, "Query::GetReplicatedRecord({requester:?} {key:?})") diff --git a/autonomi/src/client/data/mod.rs b/autonomi/src/client/data/mod.rs index 5fc1eb3290..066c578585 100644 --- a/autonomi/src/client/data/mod.rs +++ b/autonomi/src/client/data/mod.rs @@ -11,7 +11,7 @@ use std::sync::LazyLock; use ant_evm::{Amount, EvmWalletError}; use ant_networking::NetworkError; -use ant_protocol::storage::Chunk; +use ant_protocol::storage::{Chunk, DataTypes}; use ant_protocol::NetworkAddress; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -220,10 +220,17 @@ impl Client { debug!("Encryption took: {:.2?}", now.elapsed()); // Pay for all chunks - let xor_names: Vec<_> = chunks.iter().map(|chunk| *chunk.name()).collect(); + let xor_names: Vec<_> = chunks + .iter() + .map(|chunk| (*chunk.name(), chunk.serialised_size())) + .collect(); info!("Paying for {} addresses", xor_names.len()); let (receipt, skipped_payments) = self - .pay_for_content_addrs(xor_names.into_iter(), payment_option) + .pay_for_content_addrs( + DataTypes::Chunk.get_index(), + xor_names.into_iter(), + payment_option, + ) .await .inspect_err(|err| error!("Error paying for data: {err:?}"))?; diff --git a/autonomi/src/client/data/public.rs b/autonomi/src/client/data/public.rs index fde2908964..03d17df7c3 100644 --- a/autonomi/src/client/data/public.rs +++ b/autonomi/src/client/data/public.rs @@ -51,16 +51,20 @@ impl Client { info!("Uploading datamap chunk to the network at: {data_map_addr:?}"); let map_xor_name = *data_map_chunk.address().xorname(); - let mut xor_names = vec![map_xor_name]; + let mut xor_names = vec![(map_xor_name, data_map_chunk.serialised_size())]; for chunk in &chunks { - xor_names.push(*chunk.name()); + xor_names.push((*chunk.name(), chunk.serialised_size())); } // Pay for all chunks + data map chunk info!("Paying for {} addresses", xor_names.len()); let (receipt, skipped_payments) = self - .pay_for_content_addrs(xor_names.into_iter(), payment_option) + .pay_for_content_addrs( + DataTypes::Chunk.get_index(), + xor_names.into_iter(), + payment_option, + ) .await .inspect_err(|err| error!("Error paying for data: {err:?}"))?; @@ -145,15 +149,15 @@ impl Client { /// Get the estimated cost of storing a piece of data. pub async fn data_cost(&self, data: Bytes) -> Result { let now = ant_networking::time::Instant::now(); - let (data_map_chunk, chunks) = encrypt(data)?; + let (data_map_chunks, chunks) = encrypt(data)?; debug!("Encryption took: {:.2?}", now.elapsed()); - let map_xor_name = *data_map_chunk.address().xorname(); - let mut content_addrs = vec![map_xor_name]; + let map_xor_name = *data_map_chunks.address().xorname(); + let mut content_addrs = vec![(map_xor_name, data_map_chunks.serialised_size())]; for chunk in &chunks { - content_addrs.push(*chunk.name()); + content_addrs.push((*chunk.name(), chunk.serialised_size())); } info!( @@ -162,7 +166,7 @@ impl Client { ); let store_quote = self - .get_store_quotes(content_addrs.into_iter()) + .get_store_quotes(DataTypes::Chunk.get_index(), content_addrs.into_iter()) .await .inspect_err(|err| error!("Error getting store quotes: {err:?}"))?; diff --git a/autonomi/src/client/external_signer.rs b/autonomi/src/client/external_signer.rs index 4309dba99f..b05e958422 100644 --- a/autonomi/src/client/external_signer.rs +++ b/autonomi/src/client/external_signer.rs @@ -17,7 +17,8 @@ impl Client { /// Returns a cost map, data payments to be executed and a list of free (already paid for) chunks. pub async fn get_quotes_for_content_addresses( &self, - content_addrs: impl Iterator + Clone, + data_type: u32, + content_addrs: impl Iterator + Clone, ) -> Result< ( HashMap, @@ -26,14 +27,20 @@ impl Client { ), PutError, > { - let quote = self.get_store_quotes(content_addrs.clone()).await?; + let quote = self + .get_store_quotes(data_type, content_addrs.clone()) + .await?; let payments = quote.payments(); - let free_chunks = content_addrs - .filter(|addr| !quote.0.contains_key(addr)) + let free_chunks: Vec<_> = content_addrs + .filter(|(addr, _)| !quote.0.contains_key(addr)) .collect(); let quotes_per_addr: HashMap<_, _> = quote.0.into_iter().collect(); - Ok((quotes_per_addr, payments, free_chunks)) + Ok(( + quotes_per_addr, + payments, + free_chunks.iter().map(|(addr, _)| *addr).collect(), + )) } } diff --git a/autonomi/src/client/graph.rs b/autonomi/src/client/graph.rs index 6b4db4b060..0cf2be0164 100644 --- a/autonomi/src/client/graph.rs +++ b/autonomi/src/client/graph.rs @@ -70,7 +70,11 @@ impl Client { let xor_name = address.xorname(); debug!("Paying for transaction at address: {address:?}"); let (payment_proofs, skipped_payments) = self - .pay(std::iter::once(*xor_name), wallet) + .pay( + DataTypes::GraphEntry.get_index(), + std::iter::once((*xor_name, entry.bytes_for_signature().len())), + wallet, + ) .await .inspect_err(|err| { error!("Failed to pay for transaction at address: {address:?} : {err}") @@ -144,7 +148,13 @@ impl Client { let address = GraphEntryAddress::from_owner(pk); let xor = *address.xorname(); - let store_quote = self.get_store_quotes(std::iter::once(xor)).await?; + // TODO: define default size of GraphEntry + let store_quote = self + .get_store_quotes( + DataTypes::GraphEntry.get_index(), + std::iter::once((xor, 512)), + ) + .await?; let total_cost = AttoTokens::from_atto( store_quote .0 diff --git a/autonomi/src/client/payment.rs b/autonomi/src/client/payment.rs index b6dc4c936e..2e693088cb 100644 --- a/autonomi/src/client/payment.rs +++ b/autonomi/src/client/payment.rs @@ -65,12 +65,13 @@ impl From for PaymentOption { impl Client { pub(crate) async fn pay_for_content_addrs( &self, - content_addrs: impl Iterator + Clone, + data_type: u32, + content_addrs: impl Iterator + Clone, payment_option: PaymentOption, ) -> Result<(Receipt, AlreadyPaidAddressesCount), PayError> { match payment_option { PaymentOption::Wallet(wallet) => { - let (receipt, skipped) = self.pay(content_addrs, &wallet).await?; + let (receipt, skipped) = self.pay(data_type, content_addrs, &wallet).await?; Ok((receipt, skipped)) } PaymentOption::Receipt(receipt) => Ok((receipt, 0)), diff --git a/autonomi/src/client/pointer.rs b/autonomi/src/client/pointer.rs index eb65128eae..5021437aeb 100644 --- a/autonomi/src/client/pointer.rs +++ b/autonomi/src/client/pointer.rs @@ -63,7 +63,12 @@ impl Client { let xor_name = *address.xorname(); debug!("Paying for pointer at address: {address:?}"); let (payment_proofs, _skipped_payments) = self - .pay(std::iter::once(xor_name), wallet) + // TODO: define Pointer default size for pricing + .pay( + DataTypes::Pointer.get_index(), + std::iter::once((xor_name, 128)), + wallet, + ) .await .inspect_err(|err| { error!("Failed to pay for pointer at address: {address:?} : {err}") @@ -126,7 +131,10 @@ impl Client { let address = PointerAddress::from_owner(pk); let xor = *address.xorname(); - let store_quote = self.get_store_quotes(std::iter::once(xor)).await?; + // TODO: define default size of Pointer + let store_quote = self + .get_store_quotes(DataTypes::Pointer.get_index(), std::iter::once((xor, 128))) + .await?; let total_cost = AttoTokens::from_atto( store_quote .0 diff --git a/autonomi/src/client/quote.rs b/autonomi/src/client/quote.rs index b89e1bbf34..a98f64d050 100644 --- a/autonomi/src/client/quote.rs +++ b/autonomi/src/client/quote.rs @@ -55,12 +55,15 @@ impl StoreQuote { impl Client { pub async fn get_store_quotes( &self, - content_addrs: impl Iterator, + data_type: u32, + content_addrs: impl Iterator, ) -> Result { // get all quotes from nodes let futures: Vec<_> = content_addrs .into_iter() - .map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr)) + .map(|(content_addr, data_size)| { + fetch_store_quote_with_retries(&self.network, content_addr, data_type, data_size) + }) .collect(); let raw_quotes_per_addr = futures::future::try_join_all(futures).await?; @@ -149,10 +152,14 @@ impl Client { async fn fetch_store_quote( network: &Network, content_addr: XorName, + data_type: u32, + data_size: usize, ) -> Result, NetworkError> { network .get_store_quote_from_network( NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)), + data_type, + data_size, vec![], ) .await @@ -162,11 +169,13 @@ async fn fetch_store_quote( async fn fetch_store_quote_with_retries( network: &Network, content_addr: XorName, + data_type: u32, + data_size: usize, ) -> Result<(XorName, Vec<(PeerId, PaymentQuote)>), CostError> { let mut retries = 0; loop { - match fetch_store_quote(network, content_addr).await { + match fetch_store_quote(network, content_addr, data_type, data_size).await { Ok(quote) => { if quote.len() < CLOSE_GROUP_SIZE { retries += 1; diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index ee8514e7b5..4c5719ee48 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -261,8 +261,13 @@ impl Client { let reg_xor = register.address().xorname(); // get cost to store register - // NB TODO: register should be priced differently from other data - let store_quote = self.get_store_quotes(std::iter::once(reg_xor)).await?; + // TODO: define default size of Register + let store_quote = self + .get_store_quotes( + DataTypes::Register.get_index(), + std::iter::once((reg_xor, 256)), + ) + .await?; let total_cost = AttoTokens::from_atto( store_quote @@ -320,11 +325,16 @@ impl Client { let reg_xor = address.xorname(); debug!("Paying for register at address: {address}"); let (payment_proofs, skipped_payments) = self - .pay(std::iter::once(reg_xor), wallet) + // TODO: define Register default size for pricing + .pay( + DataTypes::Register.get_index(), + std::iter::once((reg_xor, 256)), + wallet, + ) .await .inspect_err(|err| { - error!("Failed to pay for register at address: {address} : {err}") - })?; + error!("Failed to pay for register at address: {address} : {err}") + })?; let (proof, price) = if let Some((proof, price)) = payment_proofs.get(®_xor) { (proof, price) } else { diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index ae0cac0b48..e17783cb29 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -167,11 +167,12 @@ impl Client { /// Pay for the chunks and get the proof of payment. pub(crate) async fn pay( &self, - content_addrs: impl Iterator + Clone, + data_type: u32, + content_addrs: impl Iterator + Clone, wallet: &EvmWallet, ) -> Result<(Receipt, AlreadyPaidAddressesCount), PayError> { let number_of_content_addrs = content_addrs.clone().count(); - let quotes = self.get_store_quotes(content_addrs).await?; + let quotes = self.get_store_quotes(data_type, content_addrs).await?; // Make sure nobody else can use the wallet while we are paying debug!("Waiting for wallet lock"); diff --git a/autonomi/src/client/vault.rs b/autonomi/src/client/vault.rs index 45fa3e2256..2ffd0a8300 100644 --- a/autonomi/src/client/vault.rs +++ b/autonomi/src/client/vault.rs @@ -151,8 +151,13 @@ impl Client { let scratch = Scratchpad::new(client_pk, content_type); let vault_xor = scratch.address().xorname(); - // NB TODO: vault should be priced differently from other data - let store_quote = self.get_store_quotes(std::iter::once(vault_xor)).await?; + // TODO: define default size of Scratchpad + let store_quote = self + .get_store_quotes( + DataTypes::Scratchpad.get_index(), + std::iter::once((vault_xor, 256)), + ) + .await?; let total_cost = AttoTokens::from_atto( store_quote @@ -193,7 +198,11 @@ impl Client { let record = if is_new { let (receipt, _skipped_payments) = self - .pay_for_content_addrs(std::iter::once(scratch.xorname()), payment_option) + .pay_for_content_addrs( + DataTypes::Scratchpad.get_index(), + std::iter::once((scratch.xorname(), scratch.payload_size())), + payment_option, + ) .await .inspect_err(|err| { error!("Failed to pay for new vault at addr: {scratch_address:?} : {err}"); diff --git a/autonomi/tests/external_signer.rs b/autonomi/tests/external_signer.rs index 6430132d5d..59190c6c9d 100644 --- a/autonomi/tests/external_signer.rs +++ b/autonomi/tests/external_signer.rs @@ -4,6 +4,7 @@ use alloy::network::TransactionBuilder; use alloy::providers::Provider; use ant_evm::{QuoteHash, TxHash}; use ant_logging::LogBuilder; +use ant_protocol::storage::DataTypes; use autonomi::client::external_signer::encrypt_data; use autonomi::client::files::archive::{Metadata, PrivateArchive}; use autonomi::client::payment::{receipt_from_store_quotes, Receipt}; @@ -23,22 +24,29 @@ async fn pay_for_data(client: &Client, wallet: &Wallet, data: Bytes) -> eyre::Re let (data_map_chunk, chunks) = encrypt_data(data)?; let map_xor_name = *data_map_chunk.address().xorname(); - let mut xor_names = vec![map_xor_name]; + let mut xor_names = vec![(map_xor_name, data_map_chunk.serialised_size())]; for chunk in chunks { - xor_names.push(*chunk.name()); + xor_names.push((*chunk.name(), chunk.serialised_size())); } - pay_for_content_addresses(client, wallet, xor_names.into_iter()).await + pay_for_content_addresses( + client, + wallet, + DataTypes::Chunk.get_index(), + xor_names.into_iter(), + ) + .await } async fn pay_for_content_addresses( client: &Client, wallet: &Wallet, - content_addrs: impl Iterator + Clone, + data_types: u32, + content_addrs: impl Iterator + Clone, ) -> eyre::Result { let (quotes, quote_payments, _free_chunks) = client - .get_quotes_for_content_addresses(content_addrs) + .get_quotes_for_content_addresses(data_types, content_addrs) .await?; // Form quotes payment transaction data @@ -147,13 +155,18 @@ async fn external_signer_put() -> eyre::Result<()> { assert!(is_new, "Scratchpad is not new"); let scratch_addresses = if is_new { - vec![scratch.xorname()] + vec![(scratch.xorname(), scratch.payload_size())] } else { vec![] }; - let receipt = - pay_for_content_addresses(&client, &wallet, scratch_addresses.into_iter()).await?; + let receipt = pay_for_content_addresses( + &client, + &wallet, + DataTypes::Scratchpad.get_index(), + scratch_addresses.into_iter(), + ) + .await?; sleep(Duration::from_secs(5)).await; diff --git a/evmlib/src/quoting_metrics.rs b/evmlib/src/quoting_metrics.rs index c4971a1b03..187e5fd416 100644 --- a/evmlib/src/quoting_metrics.rs +++ b/evmlib/src/quoting_metrics.rs @@ -13,6 +13,10 @@ use std::fmt::{Debug, Formatter, Result as FmtResult}; /// Quoting metrics used to generate a quote, or to track peer's status. #[derive(Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct QuotingMetrics { + /// DataTypes presented as its `index` + pub data_type: u32, + /// data size of the record + pub data_size: usize, /// the records stored pub close_records_stored: usize, /// the max_records configured @@ -32,6 +36,9 @@ impl QuotingMetrics { /// construct an empty QuotingMetrics pub fn new() -> Self { Self { + // Default to be charged as a `Chunk` + data_type: 0, + data_size: 0, close_records_stored: 0, max_records: 0, received_payment_count: 0, @@ -52,7 +59,7 @@ impl Debug for QuotingMetrics { fn fmt(&self, formatter: &mut Formatter) -> FmtResult { let density_u256 = self.network_density.map(U256::from_be_bytes); - write!(formatter, "QuotingMetrics {{ close_records_stored: {}, max_records: {}, received_payment_count: {}, live_time: {}, network_density: {density_u256:?}, network_size: {:?} }}", - self.close_records_stored, self.max_records, self.received_payment_count, self.live_time, self.network_size) + write!(formatter, "QuotingMetrics {{ data_type: {}, data_size: {}, close_records_stored: {}, max_records: {}, received_payment_count: {}, live_time: {}, network_density: {density_u256:?}, network_size: {:?} }}", + self.data_type, self.data_size, self.close_records_stored, self.max_records, self.received_payment_count, self.live_time, self.network_size) } } diff --git a/evmlib/tests/payment_vault.rs b/evmlib/tests/payment_vault.rs index b9437c6f6d..8f30f6523a 100644 --- a/evmlib/tests/payment_vault.rs +++ b/evmlib/tests/payment_vault.rs @@ -136,6 +136,8 @@ async fn test_get_quote_on_arb_sepolia_test() { let payment_vault = PaymentVaultHandler::new(*network.data_payments_address(), provider); let quoting_metrics = QuotingMetrics { + data_type: 1, // a GraphEntry record + data_size: 100, close_records_stored: 10, max_records: 16 * 1024, received_payment_count: 0,