Skip to content

Commit

Permalink
feat(pricing): make QuotingMetrics supports data_type and data_size p…
Browse files Browse the repository at this point in the history
…ricing
  • Loading branch information
maqi committed Jan 15, 2025
1 parent 2456e5e commit 9135fab
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 52 deletions.
16 changes: 14 additions & 2 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum LocalSwarmCmd {
/// Returns the quoting metrics and whether the record at `key` is already stored locally
GetLocalQuotingMetrics {
key: RecordKey,
data_type: u32,
data_size: usize,
sender: oneshot::Sender<(QuotingMetrics, bool)>,
},
/// Notify the node received a payment.
Expand Down Expand Up @@ -575,7 +577,12 @@ impl SwarmDriver {
cmd_string = "TriggerIntervalReplication";
self.try_interval_replication()?;
}
LocalSwarmCmd::GetLocalQuotingMetrics { key, sender } => {
LocalSwarmCmd::GetLocalQuotingMetrics {
key,
data_type,
data_size,
sender,
} => {
cmd_string = "GetLocalQuotingMetrics";
let (
_index,
Expand All @@ -591,7 +598,12 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.quoting_metrics(&key, Some(estimated_network_size as u64));
.quoting_metrics(
&key,
data_type,
data_size,
Some(estimated_network_size as u64),
);

self.record_metrics(Marker::QuotingMetrics {
quoting_metrics: &quoting_metrics,
Expand Down
13 changes: 12 additions & 1 deletion ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ impl Network {
pub async fn get_store_quote_from_network(
&self,
record_address: NetworkAddress,
data_type: u32,
data_size: usize,
ignore_peers: Vec<PeerId>,
) -> Result<Vec<(PeerId, PaymentQuote)>> {
// The requirement of having at least CLOSE_GROUP_SIZE
Expand All @@ -400,6 +402,8 @@ impl Network {
// Client shall decide whether to carry out storage verification or not.
let request = Request::Query(Query::GetStoreQuote {
key: record_address.clone(),
data_type,
data_size,
nonce: None,
difficulty: 0,
});
Expand Down Expand Up @@ -810,9 +814,16 @@ impl Network {
pub async fn get_local_quoting_metrics(
&self,
key: RecordKey,
data_type: u32,
data_size: usize,
) -> Result<(QuotingMetrics, bool)> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics { key, sender });
self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
key,
data_type,
data_size,
sender,
});

receiver
.await
Expand Down
4 changes: 4 additions & 0 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ impl NodeRecordStore {
pub(crate) fn quoting_metrics(
&self,
key: &Key,
data_type: u32,
data_size: usize,
network_size: Option<u64>,
) -> (QuotingMetrics, bool) {
let records_stored = self.records.len();
Expand All @@ -725,6 +727,8 @@ impl NodeRecordStore {
};

let mut quoting_metrics = QuotingMetrics {
data_type,
data_size,
close_records_stored: records_stored,
max_records: self.config.max_records,
received_payment_count: self.received_payment_count,
Expand Down
4 changes: 3 additions & 1 deletion ant-networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ impl UnifiedRecordStore {
pub(crate) fn quoting_metrics(
&self,
key: &RecordKey,
data_type: u32,
data_size: usize,
network_size: Option<u64>,
) -> (QuotingMetrics, bool) {
match self {
Self::Client(_) => {
warn!("Calling quoting metrics calculation at Client. This should not happen");
Default::default()
}
Self::Node(store) => store.quoting_metrics(key, network_size),
Self::Node(store) => store.quoting_metrics(key, data_type, data_size, network_size),
}
}

Expand Down
7 changes: 5 additions & 2 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,18 @@ impl Node {
let resp: QueryResponse = match query {
Query::GetStoreQuote {
key,
data_type,
data_size,
nonce,
difficulty,
} => {
debug!("Got GetStoreQuote request for {key:?} with difficulty {difficulty}");
let record_key = key.to_record_key();
let self_id = network.peer_id();

let maybe_quoting_metrics =
network.get_local_quoting_metrics(record_key.clone()).await;
let maybe_quoting_metrics = network
.get_local_quoting_metrics(record_key.clone(), data_type, data_size)
.await;

let storage_proofs = if let Some(nonce) = nonce {
Self::respond_x_closest_record_proof(
Expand Down
11 changes: 10 additions & 1 deletion ant-protocol/src/messages/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub enum Query {
GetStoreQuote {
/// The Address of the record to be stored.
key: NetworkAddress,
/// DataTypes as represented as its `index`
data_type: u32,
/// Data size of the record
data_size: usize,
/// The random nonce that nodes use to produce the Proof (i.e., hash(record+nonce))
/// Set to None if no need to carry out storage check.
nonce: Option<Nonce>,
Expand Down Expand Up @@ -101,10 +105,15 @@ impl std::fmt::Display for Query {
match self {
Query::GetStoreQuote {
key,
data_type,
data_size,
nonce,
difficulty,
} => {
write!(f, "Query::GetStoreQuote({key:?} {nonce:?} {difficulty})")
write!(
f,
"Query::GetStoreQuote({key:?} {data_type} {data_size} {nonce:?} {difficulty})"
)
}
Query::GetReplicatedRecord { key, requester } => {
write!(f, "Query::GetReplicatedRecord({requester:?} {key:?})")
Expand Down
13 changes: 10 additions & 3 deletions autonomi/src/client/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::LazyLock;

use ant_evm::{Amount, EvmWalletError};
use ant_networking::NetworkError;
use ant_protocol::storage::Chunk;
use ant_protocol::storage::{Chunk, DataTypes};
use ant_protocol::NetworkAddress;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -220,10 +220,17 @@ impl Client {
debug!("Encryption took: {:.2?}", now.elapsed());

// Pay for all chunks
let xor_names: Vec<_> = chunks.iter().map(|chunk| *chunk.name()).collect();
let xor_names: Vec<_> = chunks
.iter()
.map(|chunk| (*chunk.name(), chunk.serialised_size()))
.collect();
info!("Paying for {} addresses", xor_names.len());
let (receipt, skipped_payments) = self
.pay_for_content_addrs(xor_names.into_iter(), payment_option)
.pay_for_content_addrs(
DataTypes::Chunk.get_index(),
xor_names.into_iter(),
payment_option,
)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

Expand Down
20 changes: 12 additions & 8 deletions autonomi/src/client/data/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,20 @@ impl Client {
info!("Uploading datamap chunk to the network at: {data_map_addr:?}");

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];
let mut xor_names = vec![(map_xor_name, data_map_chunk.serialised_size())];

for chunk in &chunks {
xor_names.push(*chunk.name());
xor_names.push((*chunk.name(), chunk.serialised_size()));
}

// Pay for all chunks + data map chunk
info!("Paying for {} addresses", xor_names.len());
let (receipt, skipped_payments) = self
.pay_for_content_addrs(xor_names.into_iter(), payment_option)
.pay_for_content_addrs(
DataTypes::Chunk.get_index(),
xor_names.into_iter(),
payment_option,
)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

Expand Down Expand Up @@ -145,15 +149,15 @@ impl Client {
/// Get the estimated cost of storing a piece of data.
pub async fn data_cost(&self, data: Bytes) -> Result<AttoTokens, CostError> {
let now = ant_networking::time::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
let (data_map_chunks, chunks) = encrypt(data)?;

debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut content_addrs = vec![map_xor_name];
let map_xor_name = *data_map_chunks.address().xorname();
let mut content_addrs = vec![(map_xor_name, data_map_chunks.serialised_size())];

for chunk in &chunks {
content_addrs.push(*chunk.name());
content_addrs.push((*chunk.name(), chunk.serialised_size()));
}

info!(
Expand All @@ -162,7 +166,7 @@ impl Client {
);

let store_quote = self
.get_store_quotes(content_addrs.into_iter())
.get_store_quotes(DataTypes::Chunk.get_index(), content_addrs.into_iter())
.await
.inspect_err(|err| error!("Error getting store quotes: {err:?}"))?;

Expand Down
17 changes: 12 additions & 5 deletions autonomi/src/client/external_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ impl Client {
/// Returns a cost map, data payments to be executed and a list of free (already paid for) chunks.
pub async fn get_quotes_for_content_addresses(
&self,
content_addrs: impl Iterator<Item = XorName> + Clone,
data_type: u32,
content_addrs: impl Iterator<Item = (XorName, usize)> + Clone,
) -> Result<
(
HashMap<XorName, QuoteForAddress>,
Expand All @@ -26,14 +27,20 @@ impl Client {
),
PutError,
> {
let quote = self.get_store_quotes(content_addrs.clone()).await?;
let quote = self
.get_store_quotes(data_type, content_addrs.clone())
.await?;
let payments = quote.payments();
let free_chunks = content_addrs
.filter(|addr| !quote.0.contains_key(addr))
let free_chunks: Vec<_> = content_addrs
.filter(|(addr, _)| !quote.0.contains_key(addr))
.collect();
let quotes_per_addr: HashMap<_, _> = quote.0.into_iter().collect();

Ok((quotes_per_addr, payments, free_chunks))
Ok((
quotes_per_addr,
payments,
free_chunks.iter().map(|(addr, _)| *addr).collect(),
))
}
}

Expand Down
14 changes: 12 additions & 2 deletions autonomi/src/client/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ impl Client {
let xor_name = address.xorname();
debug!("Paying for transaction at address: {address:?}");
let (payment_proofs, skipped_payments) = self
.pay(std::iter::once(*xor_name), wallet)
.pay(
DataTypes::GraphEntry.get_index(),
std::iter::once((*xor_name, entry.bytes_for_signature().len())),
wallet,
)
.await
.inspect_err(|err| {
error!("Failed to pay for transaction at address: {address:?} : {err}")
Expand Down Expand Up @@ -144,7 +148,13 @@ impl Client {

let address = GraphEntryAddress::from_owner(pk);
let xor = *address.xorname();
let store_quote = self.get_store_quotes(std::iter::once(xor)).await?;
// TODO: define default size of GraphEntry

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let store_quote = self
.get_store_quotes(
DataTypes::GraphEntry.get_index(),
std::iter::once((xor, 512)),
)
.await?;
let total_cost = AttoTokens::from_atto(
store_quote
.0
Expand Down
5 changes: 3 additions & 2 deletions autonomi/src/client/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ impl From<Receipt> for PaymentOption {
impl Client {
pub(crate) async fn pay_for_content_addrs(
&self,
content_addrs: impl Iterator<Item = XorName> + Clone,
data_type: u32,
content_addrs: impl Iterator<Item = (XorName, usize)> + Clone,
payment_option: PaymentOption,
) -> Result<(Receipt, AlreadyPaidAddressesCount), PayError> {
match payment_option {
PaymentOption::Wallet(wallet) => {
let (receipt, skipped) = self.pay(content_addrs, &wallet).await?;
let (receipt, skipped) = self.pay(data_type, content_addrs, &wallet).await?;
Ok((receipt, skipped))
}
PaymentOption::Receipt(receipt) => Ok((receipt, 0)),
Expand Down
12 changes: 10 additions & 2 deletions autonomi/src/client/pointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ impl Client {
let xor_name = *address.xorname();
debug!("Paying for pointer at address: {address:?}");
let (payment_proofs, _skipped_payments) = self
.pay(std::iter::once(xor_name), wallet)
// TODO: define Pointer default size for pricing

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
.pay(
DataTypes::Pointer.get_index(),
std::iter::once((xor_name, 128)),
wallet,
)
.await
.inspect_err(|err| {
error!("Failed to pay for pointer at address: {address:?} : {err}")
Expand Down Expand Up @@ -126,7 +131,10 @@ impl Client {

let address = PointerAddress::from_owner(pk);
let xor = *address.xorname();
let store_quote = self.get_store_quotes(std::iter::once(xor)).await?;
// TODO: define default size of Pointer

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let store_quote = self
.get_store_quotes(DataTypes::Pointer.get_index(), std::iter::once((xor, 128)))
.await?;
let total_cost = AttoTokens::from_atto(
store_quote
.0
Expand Down
15 changes: 12 additions & 3 deletions autonomi/src/client/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ impl StoreQuote {
impl Client {
pub async fn get_store_quotes(
&self,
content_addrs: impl Iterator<Item = XorName>,
data_type: u32,
content_addrs: impl Iterator<Item = (XorName, usize)>,
) -> Result<StoreQuote, CostError> {
// get all quotes from nodes
let futures: Vec<_> = content_addrs
.into_iter()
.map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr))
.map(|(content_addr, data_size)| {
fetch_store_quote_with_retries(&self.network, content_addr, data_type, data_size)
})
.collect();

let raw_quotes_per_addr = futures::future::try_join_all(futures).await?;
Expand Down Expand Up @@ -149,10 +152,14 @@ impl Client {
async fn fetch_store_quote(
network: &Network,
content_addr: XorName,
data_type: u32,
data_size: usize,
) -> Result<Vec<(PeerId, PaymentQuote)>, NetworkError> {
network
.get_store_quote_from_network(
NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)),
data_type,
data_size,
vec![],
)
.await
Expand All @@ -162,11 +169,13 @@ async fn fetch_store_quote(
async fn fetch_store_quote_with_retries(
network: &Network,
content_addr: XorName,
data_type: u32,
data_size: usize,
) -> Result<(XorName, Vec<(PeerId, PaymentQuote)>), CostError> {
let mut retries = 0;

loop {
match fetch_store_quote(network, content_addr).await {
match fetch_store_quote(network, content_addr, data_type, data_size).await {
Ok(quote) => {
if quote.len() < CLOSE_GROUP_SIZE {
retries += 1;
Expand Down
Loading

0 comments on commit 9135fab

Please sign in to comment.