Skip to content

Commit

Permalink
Merge branch 'main' into local_as_runtime_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach authored Jan 8, 2025
2 parents 46de268 + 605a3eb commit 8d8a1c9
Show file tree
Hide file tree
Showing 23 changed files with 258 additions and 211 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ jobs:
ls -l $ANT_DATA_PATH/client_first/logs
mkdir $ANT_DATA_PATH/client
ls -l $ANT_DATA_PATH
cp ./the-test-data.zip ./the-test-data_1.zip
./target/release/ant --log-output-dest=data-dir --local file upload "./the-test-data_1.zip" > ./second_upload 2>&1
./target/release/ant --log-output-dest=data-dir --local file upload --public "./the-test-data.zip" > ./upload_output_second 2>&1
rg 'Total cost: 0 AttoTokens' ./upload_output_second -c --stats
env:
ANT_LOG: "all"
timeout-minutes: 25

- name: showing the second upload terminal output
run: cat second_upload
run: cat upload_output_second
shell: bash
if: always()

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 4 additions & 11 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256};
use ant_protocol::{
convert_distance_to_u256,
messages::{Cmd, Request, Response},
storage::{RecordHeader, RecordKind, ValidationType},
storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
use libp2p::{
Expand Down Expand Up @@ -661,19 +661,12 @@ impl SwarmDriver {
let record_type = match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::Chunk => ValidationType::Chunk,
RecordKind::GraphEntry
| RecordKind::Pointer
| RecordKind::Register
| RecordKind::Scratchpad => {
RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
RecordKind::DataOnly(_) => {
let content_hash = XorName::from_content(&record.value);
ValidationType::NonChunk(content_hash)
}
RecordKind::ChunkWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::ScratchpadWithPayment => {
RecordKind::DataWithPayment(_) => {
error!("Record {record_key:?} with payment shall not be stored locally.");
return Err(NetworkError::InCorrectRecordHeader);
}
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE,
};
use ant_protocol::{
storage::{try_serialize_record, GraphEntry, RecordKind},
storage::{try_serialize_record, DataTypes, GraphEntry, RecordKind},
NetworkAddress, PrettyPrintRecordKey,
};
use itertools::Itertools;
Expand Down Expand Up @@ -415,7 +415,7 @@ impl SwarmDriver {

let bytes = try_serialize_record(
&accumulated_transactions,
RecordKind::GraphEntry,
RecordKind::DataOnly(DataTypes::GraphEntry),
)?;

let new_accumulated_record = Record {
Expand Down
8 changes: 5 additions & 3 deletions ant-networking/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{driver::GetRecordCfg, Network, NetworkError, Result};
use ant_protocol::storage::{GraphEntry, GraphEntryAddress};
use ant_protocol::storage::{DataTypes, GraphEntry, GraphEntryAddress};
use ant_protocol::{
storage::{try_deserialize_record, RecordHeader, RecordKind, RetryStrategy},
NetworkAddress, PrettyPrintRecordKey,
Expand Down Expand Up @@ -37,14 +37,16 @@ impl Network {

pub fn get_graph_entry_from_record(record: &Record) -> Result<Vec<GraphEntry>> {
let header = RecordHeader::from_record(record)?;
if let RecordKind::GraphEntry = header.kind {
if let RecordKind::DataOnly(DataTypes::GraphEntry) = header.kind {
let transactions = try_deserialize_record::<Vec<GraphEntry>>(record)?;
Ok(transactions)
} else {
warn!(
"RecordKind mismatch while trying to retrieve graph_entry from record {:?}",
PrettyPrintRecordKey::from(&record.key)
);
Err(NetworkError::RecordKindMismatch(RecordKind::GraphEntry))
Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly(
DataTypes::GraphEntry,
)))
}
}
60 changes: 30 additions & 30 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics};
use ant_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
storage::{Pointer, RetryStrategy, Scratchpad, ValidationType},
storage::{DataTypes, Pointer, RetryStrategy, Scratchpad, ValidationType},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use futures::future::select_all;
Expand Down Expand Up @@ -632,16 +632,11 @@ impl Network {
}

match kind {
RecordKind::Chunk
| RecordKind::ChunkWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::ScratchpadWithPayment => {
RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
continue;
}
RecordKind::GraphEntry => {
RecordKind::DataOnly(DataTypes::GraphEntry) => {
info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions");

match get_graph_entry_from_record(record) {
Expand All @@ -653,7 +648,7 @@ impl Network {
}
}
}
RecordKind::Register => {
RecordKind::DataOnly(DataTypes::Register) => {
info!("For record {pretty_key:?}, we have a split record for a register. Accumulating registers");
let Ok(register) = try_deserialize_record::<SignedRegister>(record) else {
error!(
Expand All @@ -675,7 +670,7 @@ impl Network {
}
}
}
RecordKind::Pointer => {
RecordKind::DataOnly(DataTypes::Pointer) => {
info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
error!(
Expand All @@ -697,7 +692,7 @@ impl Network {
}
valid_pointer = Some(pointer);
}
RecordKind::Scratchpad => {
RecordKind::DataOnly(DataTypes::Scratchpad) => {
info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
error!(
Expand Down Expand Up @@ -733,7 +728,7 @@ impl Network {
.collect::<Vec<GraphEntry>>();
let record = Record {
key: key.clone(),
value: try_serialize_record(&accumulated_transactions, RecordKind::GraphEntry)
value: try_serialize_record(&accumulated_transactions, RecordKind::DataOnly(DataTypes::GraphEntry))
.map_err(|err| {
error!(
"Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}"
Expand All @@ -754,14 +749,15 @@ impl Network {
acc
});

let record_value = try_serialize_record(&signed_register, RecordKind::Register)
.map_err(|err| {
error!(
let record_value =
try_serialize_record(&signed_register, RecordKind::DataOnly(DataTypes::Register))
.map_err(|err| {
error!(
"Error while serializing the merged register for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec();
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand All @@ -772,12 +768,13 @@ impl Network {
return Ok(Some(record));
} else if let Some(pointer) = valid_pointer {
info!("For record {pretty_key:?} task found a valid pointer, returning it.");
let record_value = try_serialize_record(&pointer, RecordKind::Pointer)
.map_err(|err| {
error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();
let record_value =
try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
.map_err(|err| {
error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand All @@ -788,12 +785,15 @@ impl Network {
return Ok(Some(record));
} else if let Some(scratchpad) = valid_scratchpad {
info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
let record_value = try_serialize_record(&scratchpad, RecordKind::Scratchpad)
.map_err(|err| {
error!("Error while serializing the scratchpad for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();
let record_value =
try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
.map_err(|err| {
error!(
"Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand Down
25 changes: 9 additions & 16 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,19 +820,11 @@ impl RecordStore for NodeRecordStore {
match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::ChunkWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::PointerWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::ScratchpadWithPayment => {
RecordKind::DataWithPayment(_) => {
debug!("Record {record_key:?} with payment shall always be processed.");
}
// Shall not use wildcard, to avoid mis-match during enum update.
RecordKind::Chunk
| RecordKind::GraphEntry
| RecordKind::Pointer
| RecordKind::Register
| RecordKind::Scratchpad => {
RecordKind::DataOnly(_) => {
// Chunk with existing key do not to be stored again.
// Others with same content_hash do not to be stored again,
// otherwise shall be passed further to allow different version of nonchunk
Expand Down Expand Up @@ -1003,7 +995,7 @@ mod tests {

use ant_protocol::convert_distance_to_u256;
use ant_protocol::storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, Scratchpad,
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, DataTypes, Scratchpad,
};
use assert_fs::{
fixture::{PathChild, PathCreateDir},
Expand Down Expand Up @@ -1036,7 +1028,7 @@ mod tests {
fn arbitrary(g: &mut Gen) -> ArbitraryRecord {
let value = match try_serialize_record(
&(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
RecordKind::Chunk,
RecordKind::DataOnly(DataTypes::Chunk),
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
Expand Down Expand Up @@ -1162,7 +1154,7 @@ mod tests {
// Create a record from the chunk
let record = Record {
key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(),
value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(),
value: try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk))?.to_vec(),
expires: None,
publisher: None,
};
Expand Down Expand Up @@ -1334,7 +1326,8 @@ mod tests {
// Create a record from the scratchpad
let record = Record {
key: NetworkAddress::ScratchpadAddress(scratchpad_address).to_record_key(),
value: try_serialize_record(&scratchpad, RecordKind::Scratchpad)?.to_vec(),
value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))?
.to_vec(),
expires: None,
publisher: None,
};
Expand Down Expand Up @@ -1424,7 +1417,7 @@ mod tests {
let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key();
let value = match try_serialize_record(
&(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
RecordKind::Chunk,
RecordKind::DataOnly(DataTypes::Chunk),
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
Expand Down Expand Up @@ -1547,7 +1540,7 @@ mod tests {
&(0..max_records)
.map(|_| rand::random::<u8>())
.collect::<Bytes>(),
RecordKind::Chunk,
RecordKind::DataOnly(DataTypes::Chunk),
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
Expand Down
17 changes: 16 additions & 1 deletion ant-networking/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use libp2p::{
PeerId, Transport as _,
};

const MAX_STREAM_DATA_ENV_STR: &str = "ANT_MAX_STREAM_DATA";

pub(crate) fn build_transport(
keypair: &Keypair,
#[cfg(feature = "open-metrics")] registries: &mut MetricsRegistries,
Expand All @@ -30,5 +32,18 @@ pub(crate) fn build_transport(
fn generate_quic_transport(
keypair: &Keypair,
) -> libp2p::quic::GenTransport<libp2p::quic::tokio::Provider> {
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(keypair))
let mut quic_config = libp2p::quic::Config::new(keypair);
if let Ok(val) = std::env::var(MAX_STREAM_DATA_ENV_STR) {
match val.parse::<u32>() {
Ok(val) => {
quic_config.max_stream_data = val;
tracing::info!("Overriding QUIC connection receive window value to {val}");
}
Err(e) => {
tracing::warn!("QUIC connection receive window value override failed. Could not parse `{MAX_STREAM_DATA_ENV_STR}={val}` as integer: {e}")
}
}
}

libp2p::quic::tokio::Transport::new(quic_config)
}
18 changes: 6 additions & 12 deletions ant-node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use crate::Marker;
use ant_networking::time::Instant;
#[cfg(feature = "open-metrics")]
use ant_networking::MetricsRegistries;
use ant_protocol::storage::DataTypes;
use prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
Expand Down Expand Up @@ -47,14 +48,7 @@ pub(crate) struct NodeMetricsRecorder {

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct PutRecordOk {
record_type: RecordType,
}

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum RecordType {
Chunk,
Register,
Spend,
record_type: DataTypes,
}

impl NodeMetricsRecorder {
Expand Down Expand Up @@ -157,7 +151,7 @@ impl NodeMetricsRecorder {
let _ = self
.put_record_ok
.get_or_create(&PutRecordOk {
record_type: RecordType::Chunk,
record_type: DataTypes::Chunk,
})
.inc();
}
Expand All @@ -166,7 +160,7 @@ impl NodeMetricsRecorder {
let _ = self
.put_record_ok
.get_or_create(&PutRecordOk {
record_type: RecordType::Register,
record_type: DataTypes::Register,
})
.inc();
}
Expand All @@ -175,7 +169,7 @@ impl NodeMetricsRecorder {
let _ = self
.put_record_ok
.get_or_create(&PutRecordOk {
record_type: RecordType::Spend,
record_type: DataTypes::GraphEntry,
})
.inc();
}
Expand Down
Loading

0 comments on commit 8d8a1c9

Please sign in to comment.