Skip to content

Commit

Permalink
feat(node): periodically forward reward to specific address
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed May 10, 2024
1 parent 94591d7 commit de1f7c1
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ jobs:
echo "Total swarm_driver long handling times is: $total_num_of_times"
echo "Total swarm_driver long handling duration is: $total_long_handling ms"
echo "Total average swarm_driver long handling duration is: $average_handling_ms ms"
total_num_of_times_limit_hits="20000" # hits
total_long_handling_limit_ms="170000" # ms
total_num_of_times_limit_hits="30000" # hits
total_long_handling_limit_ms="400000" # ms
average_handling_limit_ms="20" # ms
if (( $(echo "$total_num_of_times > $total_num_of_times_limit_hits" | bc -l) )); then
echo "Swarm_driver long handling times exceeded threshold: $total_num_of_times hits"
Expand Down
17 changes: 16 additions & 1 deletion .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ jobs:
cat spend_dag_and_statistics.txt
env:
SN_LOG: "all"
timeout-minutes: 60
timeout-minutes: 5
if: always()

- name: Check nodes running
shell: bash
Expand Down Expand Up @@ -413,6 +414,20 @@ jobs:
echo "Total swarm_driver long handling duration is: $total_long_handling ms"
echo "Total average swarm_driver long handling duration is: $average_handling_ms ms"
- name: Verify reward forwarding using rg
shell: bash
timeout-minutes: 1
run: |
min_reward_forwarding_times="150"
reward_forwarding_count=$(rg "Reward forwarding completed sending spend" $NODE_DATA_PATH -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Carried out $reward_forwarding_count reward forwardings"
if (( $(echo "$reward_forwarding_count < $min_reward_forwarding_times" | bc -l) )); then
echo "Reward forwarding times below the threshold: $reward_forwarding_count"
exit 1
fi
if: always()

- name: Upload payment wallet initialization log
uses: actions/upload-artifact@main
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -930,10 +930,10 @@ jobs:
echo "PWD subdirs:"
du -sh */
- name: Download material, 1.6G
- name: Download material, 1.1G
shell: bash
run: |
wget https://releases.ubuntu.com/16.04/ubuntu-16.04.7-desktop-amd64.iso
wget https://releases.ubuntu.com/14.04.6/ubuntu-14.04.6-desktop-i386.iso
ls -l
- name: Build binaries
Expand Down Expand Up @@ -990,7 +990,7 @@ jobs:
timeout-minutes: 5

- name: Start a client to upload
run: ~/safe --log-output-dest=data-dir files upload "ubuntu-16.04.7-desktop-amd64.iso" --retry-strategy quick
run: ~/safe --log-output-dest=data-dir files upload "ubuntu-14.04.6-desktop-i386.iso" --retry-strategy quick
env:
SN_LOG: "all"
timeout-minutes: 30
Expand Down
9 changes: 9 additions & 0 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@ use walkdir::{DirEntry, WalkDir};
use xor_name::XorName;

/// Max number of records a node can store
#[cfg(not(feature = "reward-forward"))]
const MAX_RECORDS_COUNT: usize = 2048;
#[cfg(feature = "reward-forward")]
// A spend record is at the size of 4KB roughly.
// Given chunk record is maxed at size of 512KB.
// During Beta phase, it's almost one spend per chunk,
// which makes the average record size is around 256k.
// Given we are targeting small nodes use 1GB diskspace,
// this shall allow around 4K records.
const MAX_RECORDS_COUNT: usize = 4096;

/// File name of the recorded historical quoting metrics.
const HISTORICAL_QUOTING_METRICS_FILENAME: &str = "historic_quoting_metrics";
Expand Down
4 changes: 2 additions & 2 deletions sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ name = "safenode"
path = "src/bin/safenode/main.rs"

[features]
default = ["metrics", "upnp"]
default = ["metrics", "upnp", "reward-forward"]
local-discovery = ["sn_networking/local-discovery"]
otlp = ["sn_logging/otlp"]
metrics = ["sn_logging/process-metrics"]
network-contacts = ["sn_peers_acquisition/network-contacts"]
open-metrics = ["sn_networking/open-metrics", "prometheus-client"]
encrypt-records = ["sn_networking/encrypt-records"]
upnp = ["sn_networking/upnp"]

reward-forward = ["sn_transfers/reward-forward"]

[dependencies]
assert_fs = "1.0.0"
Expand Down
115 changes: 114 additions & 1 deletion sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use sn_protocol::{
messages::{ChunkProof, Cmd, CmdResponse, Query, QueryResponse, Request, Response},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens};
use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK};
use std::{
net::SocketAddr,
path::PathBuf,
Expand All @@ -45,6 +45,15 @@ use tokio::{
task::{spawn, JoinHandle},
};

#[cfg(feature = "reward-forward")]
use libp2p::kad::{Quorum, Record};
#[cfg(feature = "reward-forward")]
use sn_networking::PutRecordCfg;
#[cfg(feature = "reward-forward")]
use sn_protocol::storage::{try_serialize_record, RecordKind, SpendAddress};
#[cfg(feature = "reward-forward")]
use sn_transfers::Hash;

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any ndoe will be half this
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 45;
Expand Down Expand Up @@ -233,6 +242,17 @@ impl Node {

let mut rolling_index = 0;

// use a random timeout to ensure not sync when transmit messages.
let balance_forward_interval: u64 = 10
* rng.gen_range(
PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
);
let balance_forward_time = Duration::from_secs(balance_forward_interval);
debug!("BalanceForward interval set to {balance_forward_time:?}");

let mut balance_forward_interval = tokio::time::interval(balance_forward_time);
let _ = balance_forward_interval.tick().await; // first tick completes immediately

loop {
let peers_connected = &peers_connected;

Expand Down Expand Up @@ -284,6 +304,20 @@ impl Node {
rolling_index += 1;
}
}
// runs every balance_forward_interval time
_ = balance_forward_interval.tick() => {
if cfg!(feature = "reward-forward") {
let start = std::time::Instant::now();
trace!("Periodic balance forward triggered");
let network = self.network.clone();
let owner = self.owner.clone();

let _handle = spawn(async move {
let _ = Self::try_forward_blance(network, owner);
info!("Periodic blance forward took {:?}", start.elapsed());
});
}
}
node_cmd = cmds_receiver.recv() => {
match node_cmd {
Ok(cmd) => {
Expand Down Expand Up @@ -726,6 +760,85 @@ impl Node {
}
}
}

// Shall be disabled after Beta
#[cfg(feature = "reward-forward")]
fn try_forward_blance(network: Network, owner: String) -> Result<()> {
let mut spend_requests = vec![];
{
// load wallet
let mut wallet = HotWallet::load_from(&network.root_dir_path)?;
let balance = wallet.balance();

let payee = vec![(
"reward collector".to_string(),
balance,
*NETWORK_ROYALTIES_PK,
)];

spend_requests.extend(
wallet
.prepare_forward_signed_spend(payee, Some(Hash::hash(&owner.into_bytes())))?,
);
}

let record_kind = RecordKind::Spend;
let put_cfg = PutRecordCfg {
put_quorum: Quorum::Majority,
retry_strategy: None,
use_put_record_to: None,
verification: None,
};

info!(
"Reward forwarding sending {} spends in this iteration.",
spend_requests.len()
);

for spend_request in spend_requests {
let network_clone = network.clone();
let put_cfg_clone = put_cfg.clone();

// Sent out spend in separate thread to avoid blocking the main one
let _handle = spawn(async move {
let unique_pubkey = *spend_request.unique_pubkey();
let cash_note_addr = SpendAddress::from_unique_pubkey(&unique_pubkey);
let network_address = NetworkAddress::from_spend_address(cash_note_addr);

let record_key = network_address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&record_key);

debug!("Reward forwarding in spend {pretty_key:?}: {spend_request:#?}");

let value = if let Ok(value) = try_serialize_record(&[spend_request], record_kind) {
value
} else {
error!("Reward forwarding: Failed to serialise spend {pretty_key:?}");
return;
};

let record = Record {
key: record_key.clone(),
value: value.to_vec(),
publisher: None,
expires: None,
};

let result = network_clone.put_record(record, &put_cfg_clone).await;

match result {
Ok(_) => info!("Reward forwarding completed sending spend {pretty_key:?}"),
Err(err) => {
info!("Reward forwarding: sending spend {pretty_key:?} failed with {err:?}")
}
}
});

std::thread::sleep(Duration::from_millis(500));
}

Ok(())
}
}

async fn chunk_proof_verify_peer(
Expand Down
3 changes: 3 additions & 0 deletions sn_transfers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ readme = "README.md"
repository = "https://github.com/maidsafe/safe_network"
version = "0.18.0"

[features]
reward-forward = []

[dependencies]
bls = { package = "blsttc", version = "8.0.1" }
custom_debug = "~0.6.1"
Expand Down
52 changes: 46 additions & 6 deletions sn_transfers/src/wallet/hot_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl HotWallet {

let created_cash_notes = transfer.cash_notes_for_recipient.clone();

self.update_local_wallet(transfer, exclusive_access)?;
self.update_local_wallet(transfer, exclusive_access, true)?;

trace!("Releasing wallet lock"); // by dropping _exclusive_access
Ok(created_cash_notes)
Expand All @@ -365,12 +365,50 @@ impl HotWallet {
self.reload()?;
trace!("Wallet locked and loaded!");

self.update_local_wallet(transfer, exclusive_access)?;
self.update_local_wallet(transfer, exclusive_access, true)?;

trace!("Releasing wallet lock"); // by dropping _exclusive_access
Ok(created_cash_notes)
}

// Create SignedSpends directly to forward all accumulated balance to the receipient.
#[cfg(feature = "reward-forward")]
pub fn prepare_forward_signed_spend(
&mut self,
to: Vec<TransactionPayeeDetails>,
reason_hash: Option<Hash>,
) -> Result<Vec<SignedSpend>> {
let (available_cash_notes, exclusive_access) = self.available_cash_notes()?;
debug!(
"Available CashNotes for local send: {:#?}",
available_cash_notes
);

let reason_hash = reason_hash.unwrap_or_default();

// create a unique key for each output
let mut rng = &mut rand::rngs::OsRng;
let to_unique_keys: Vec<_> = to
.into_iter()
.map(|(purpose, amount, address)| {
(amount, purpose, address, DerivationIndex::random(&mut rng))
})
.collect();

let transfer = OfflineTransfer::new(
available_cash_notes,
to_unique_keys,
self.address(),
reason_hash,
)?;

let signed_spends = transfer.all_spend_requests.clone();

self.update_local_wallet(transfer, exclusive_access, false)?;

Ok(signed_spends)
}

/// Performs a payment for each content address.
/// Includes payment of network royalties.
/// Returns the amount paid for storage, including the network royalties fee paid.
Expand Down Expand Up @@ -519,7 +557,7 @@ impl HotWallet {

// write all changes to local wallet
let start = Instant::now();
self.update_local_wallet(offline_transfer, exclusive_access)?;
self.update_local_wallet(offline_transfer, exclusive_access, true)?;
trace!(
"local_send_storage_payment completed local wallet update in {:?}",
start.elapsed()
Expand All @@ -532,6 +570,7 @@ impl HotWallet {
&mut self,
transfer: OfflineTransfer,
exclusive_access: WalletExclusiveAccess,
insert_into_pending_spends: bool,
) -> Result<()> {
// First of all, update client local state.
let spent_unique_pubkeys: BTreeSet<_> = transfer
Expand Down Expand Up @@ -564,9 +603,10 @@ impl HotWallet {
start.elapsed()
);
}

for request in transfer.all_spend_requests {
self.unconfirmed_spend_requests.insert(request);
if insert_into_pending_spends {
for request in transfer.all_spend_requests {
self.unconfirmed_spend_requests.insert(request);
}
}

// store wallet to disk
Expand Down

0 comments on commit de1f7c1

Please sign in to comment.