From de1f7c180df2d85f4268850e8e29cd31813a1875 Mon Sep 17 00:00:00 2001 From: qima Date: Thu, 9 May 2024 03:23:28 +0800 Subject: [PATCH] feat(node): periodically forward reward to specific address --- .github/workflows/benchmark-prs.yml | 4 +- .github/workflows/memcheck.yml | 17 +++- .github/workflows/merge.yml | 6 +- sn_networking/src/record_store.rs | 9 ++ sn_node/Cargo.toml | 4 +- sn_node/src/node.rs | 115 +++++++++++++++++++++++++- sn_transfers/Cargo.toml | 3 + sn_transfers/src/wallet/hot_wallet.rs | 52 ++++++++++-- 8 files changed, 195 insertions(+), 15 deletions(-) diff --git a/.github/workflows/benchmark-prs.yml b/.github/workflows/benchmark-prs.yml index 4d5955b418..0fef4cdc3b 100644 --- a/.github/workflows/benchmark-prs.yml +++ b/.github/workflows/benchmark-prs.yml @@ -327,8 +327,8 @@ jobs: echo "Total swarm_driver long handling times is: $total_num_of_times" echo "Total swarm_driver long handling duration is: $total_long_handling ms" echo "Total average swarm_driver long handling duration is: $average_handling_ms ms" - total_num_of_times_limit_hits="20000" # hits - total_long_handling_limit_ms="170000" # ms + total_num_of_times_limit_hits="30000" # hits + total_long_handling_limit_ms="400000" # ms average_handling_limit_ms="20" # ms if (( $(echo "$total_num_of_times > $total_num_of_times_limit_hits" | bc -l) )); then echo "Swarm_driver long handling times exceeded threshold: $total_num_of_times hits" diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index b71a6ad717..13a522bf60 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -286,7 +286,8 @@ jobs: cat spend_dag_and_statistics.txt env: SN_LOG: "all" - timeout-minutes: 60 + timeout-minutes: 5 + if: always() - name: Check nodes running shell: bash @@ -413,6 +414,20 @@ jobs: echo "Total swarm_driver long handling duration is: $total_long_handling ms" echo "Total average swarm_driver long handling duration is: $average_handling_ms ms" + - name: Verify reward forwarding using rg + shell: bash + timeout-minutes: 1 + run: | + min_reward_forwarding_times="150" + reward_forwarding_count=$(rg "Reward forwarding completed sending spend" $NODE_DATA_PATH -c --stats | \ + rg "(\d+) matches" | rg "\d+" -o) + echo "Carried out $reward_forwarding_count reward forwardings" + if (( $(echo "$reward_forwarding_count < $min_reward_forwarding_times" | bc -l) )); then + echo "Reward forwarding times below the threshold: $reward_forwarding_count" + exit 1 + fi + if: always() + - name: Upload payment wallet initialization log uses: actions/upload-artifact@main with: diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 4ee4c069e7..9d426e57a6 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -930,10 +930,10 @@ jobs: echo "PWD subdirs:" du -sh */ - - name: Download material, 1.6G + - name: Download material, 1.1G shell: bash run: | - wget https://releases.ubuntu.com/16.04/ubuntu-16.04.7-desktop-amd64.iso + wget https://releases.ubuntu.com/14.04.6/ubuntu-14.04.6-desktop-i386.iso ls -l - name: Build binaries @@ -990,7 +990,7 @@ jobs: timeout-minutes: 5 - name: Start a client to upload - run: ~/safe --log-output-dest=data-dir files upload "ubuntu-16.04.7-desktop-amd64.iso" --retry-strategy quick + run: ~/safe --log-output-dest=data-dir files upload "ubuntu-14.04.6-desktop-i386.iso" --retry-strategy quick env: SN_LOG: "all" timeout-minutes: 30 diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 153b790948..57414475ed 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -46,7 +46,16 @@ use walkdir::{DirEntry, WalkDir}; use xor_name::XorName; /// Max number of records a node can store +#[cfg(not(feature = "reward-forward"))] const MAX_RECORDS_COUNT: usize = 2048; +#[cfg(feature = "reward-forward")] +// A spend record is at the size of 4KB roughly. +// Given chunk record is maxed at size of 512KB. +// During Beta phase, it's almost one spend per chunk, +// which makes the average record size is around 256k. +// Given we are targeting small nodes use 1GB diskspace, +// this shall allow around 4K records. +const MAX_RECORDS_COUNT: usize = 4096; /// File name of the recorded historical quoting metrics. const HISTORICAL_QUOTING_METRICS_FILENAME: &str = "historic_quoting_metrics"; diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index be6658ca46..3b639b60fa 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -14,7 +14,7 @@ name = "safenode" path = "src/bin/safenode/main.rs" [features] -default = ["metrics", "upnp"] +default = ["metrics", "upnp", "reward-forward"] local-discovery = ["sn_networking/local-discovery"] otlp = ["sn_logging/otlp"] metrics = ["sn_logging/process-metrics"] @@ -22,7 +22,7 @@ network-contacts = ["sn_peers_acquisition/network-contacts"] open-metrics = ["sn_networking/open-metrics", "prometheus-client"] encrypt-records = ["sn_networking/encrypt-records"] upnp = ["sn_networking/upnp"] - +reward-forward = ["sn_transfers/reward-forward"] [dependencies] assert_fs = "1.0.0" diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index cc93f9f564..a81f701744 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -30,7 +30,7 @@ use sn_protocol::{ messages::{ChunkProof, Cmd, CmdResponse, Query, QueryResponse, Request, Response}, NetworkAddress, PrettyPrintRecordKey, }; -use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens}; +use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK}; use std::{ net::SocketAddr, path::PathBuf, @@ -45,6 +45,15 @@ use tokio::{ task::{spawn, JoinHandle}, }; +#[cfg(feature = "reward-forward")] +use libp2p::kad::{Quorum, Record}; +#[cfg(feature = "reward-forward")] +use sn_networking::PutRecordCfg; +#[cfg(feature = "reward-forward")] +use sn_protocol::storage::{try_serialize_record, RecordKind, SpendAddress}; +#[cfg(feature = "reward-forward")] +use sn_transfers::Hash; + /// Interval to trigger replication of all records to all peers. /// This is the max time it should take. Minimum interval at any ndoe will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 45; @@ -233,6 +242,17 @@ impl Node { let mut rolling_index = 0; + // use a random timeout to ensure not sync when transmit messages. + let balance_forward_interval: u64 = 10 + * rng.gen_range( + PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S, + ); + let balance_forward_time = Duration::from_secs(balance_forward_interval); + debug!("BalanceForward interval set to {balance_forward_time:?}"); + + let mut balance_forward_interval = tokio::time::interval(balance_forward_time); + let _ = balance_forward_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -284,6 +304,20 @@ impl Node { rolling_index += 1; } } + // runs every balance_forward_interval time + _ = balance_forward_interval.tick() => { + if cfg!(feature = "reward-forward") { + let start = std::time::Instant::now(); + trace!("Periodic balance forward triggered"); + let network = self.network.clone(); + let owner = self.owner.clone(); + + let _handle = spawn(async move { + let _ = Self::try_forward_blance(network, owner); + info!("Periodic blance forward took {:?}", start.elapsed()); + }); + } + } node_cmd = cmds_receiver.recv() => { match node_cmd { Ok(cmd) => { @@ -726,6 +760,85 @@ impl Node { } } } + + // Shall be disabled after Beta + #[cfg(feature = "reward-forward")] + fn try_forward_blance(network: Network, owner: String) -> Result<()> { + let mut spend_requests = vec![]; + { + // load wallet + let mut wallet = HotWallet::load_from(&network.root_dir_path)?; + let balance = wallet.balance(); + + let payee = vec![( + "reward collector".to_string(), + balance, + *NETWORK_ROYALTIES_PK, + )]; + + spend_requests.extend( + wallet + .prepare_forward_signed_spend(payee, Some(Hash::hash(&owner.into_bytes())))?, + ); + } + + let record_kind = RecordKind::Spend; + let put_cfg = PutRecordCfg { + put_quorum: Quorum::Majority, + retry_strategy: None, + use_put_record_to: None, + verification: None, + }; + + info!( + "Reward forwarding sending {} spends in this iteration.", + spend_requests.len() + ); + + for spend_request in spend_requests { + let network_clone = network.clone(); + let put_cfg_clone = put_cfg.clone(); + + // Sent out spend in separate thread to avoid blocking the main one + let _handle = spawn(async move { + let unique_pubkey = *spend_request.unique_pubkey(); + let cash_note_addr = SpendAddress::from_unique_pubkey(&unique_pubkey); + let network_address = NetworkAddress::from_spend_address(cash_note_addr); + + let record_key = network_address.to_record_key(); + let pretty_key = PrettyPrintRecordKey::from(&record_key); + + debug!("Reward forwarding in spend {pretty_key:?}: {spend_request:#?}"); + + let value = if let Ok(value) = try_serialize_record(&[spend_request], record_kind) { + value + } else { + error!("Reward forwarding: Failed to serialise spend {pretty_key:?}"); + return; + }; + + let record = Record { + key: record_key.clone(), + value: value.to_vec(), + publisher: None, + expires: None, + }; + + let result = network_clone.put_record(record, &put_cfg_clone).await; + + match result { + Ok(_) => info!("Reward forwarding completed sending spend {pretty_key:?}"), + Err(err) => { + info!("Reward forwarding: sending spend {pretty_key:?} failed with {err:?}") + } + } + }); + + std::thread::sleep(Duration::from_millis(500)); + } + + Ok(()) + } } async fn chunk_proof_verify_peer( diff --git a/sn_transfers/Cargo.toml b/sn_transfers/Cargo.toml index 5efb66adad..2a5170f1a5 100644 --- a/sn_transfers/Cargo.toml +++ b/sn_transfers/Cargo.toml @@ -10,6 +10,9 @@ readme = "README.md" repository = "https://github.com/maidsafe/safe_network" version = "0.18.0" +[features] +reward-forward = [] + [dependencies] bls = { package = "blsttc", version = "8.0.1" } custom_debug = "~0.6.1" diff --git a/sn_transfers/src/wallet/hot_wallet.rs b/sn_transfers/src/wallet/hot_wallet.rs index 72132718c0..ebd263fb39 100644 --- a/sn_transfers/src/wallet/hot_wallet.rs +++ b/sn_transfers/src/wallet/hot_wallet.rs @@ -340,7 +340,7 @@ impl HotWallet { let created_cash_notes = transfer.cash_notes_for_recipient.clone(); - self.update_local_wallet(transfer, exclusive_access)?; + self.update_local_wallet(transfer, exclusive_access, true)?; trace!("Releasing wallet lock"); // by dropping _exclusive_access Ok(created_cash_notes) @@ -365,12 +365,50 @@ impl HotWallet { self.reload()?; trace!("Wallet locked and loaded!"); - self.update_local_wallet(transfer, exclusive_access)?; + self.update_local_wallet(transfer, exclusive_access, true)?; trace!("Releasing wallet lock"); // by dropping _exclusive_access Ok(created_cash_notes) } + // Create SignedSpends directly to forward all accumulated balance to the receipient. + #[cfg(feature = "reward-forward")] + pub fn prepare_forward_signed_spend( + &mut self, + to: Vec, + reason_hash: Option, + ) -> Result> { + let (available_cash_notes, exclusive_access) = self.available_cash_notes()?; + debug!( + "Available CashNotes for local send: {:#?}", + available_cash_notes + ); + + let reason_hash = reason_hash.unwrap_or_default(); + + // create a unique key for each output + let mut rng = &mut rand::rngs::OsRng; + let to_unique_keys: Vec<_> = to + .into_iter() + .map(|(purpose, amount, address)| { + (amount, purpose, address, DerivationIndex::random(&mut rng)) + }) + .collect(); + + let transfer = OfflineTransfer::new( + available_cash_notes, + to_unique_keys, + self.address(), + reason_hash, + )?; + + let signed_spends = transfer.all_spend_requests.clone(); + + self.update_local_wallet(transfer, exclusive_access, false)?; + + Ok(signed_spends) + } + /// Performs a payment for each content address. /// Includes payment of network royalties. /// Returns the amount paid for storage, including the network royalties fee paid. @@ -519,7 +557,7 @@ impl HotWallet { // write all changes to local wallet let start = Instant::now(); - self.update_local_wallet(offline_transfer, exclusive_access)?; + self.update_local_wallet(offline_transfer, exclusive_access, true)?; trace!( "local_send_storage_payment completed local wallet update in {:?}", start.elapsed() @@ -532,6 +570,7 @@ impl HotWallet { &mut self, transfer: OfflineTransfer, exclusive_access: WalletExclusiveAccess, + insert_into_pending_spends: bool, ) -> Result<()> { // First of all, update client local state. let spent_unique_pubkeys: BTreeSet<_> = transfer @@ -564,9 +603,10 @@ impl HotWallet { start.elapsed() ); } - - for request in transfer.all_spend_requests { - self.unconfirmed_spend_requests.insert(request); + if insert_into_pending_spends { + for request in transfer.all_spend_requests { + self.unconfirmed_spend_requests.insert(request); + } } // store wallet to disk