From 5c670793496e5e6f2fbe058cba0929c8984b2663 Mon Sep 17 00:00:00 2001 From: qima Date: Wed, 10 Apr 2024 19:11:03 +0800 Subject: [PATCH] fix(node): not send out replication when failed read from local --- sn_networking/src/cmd.rs | 5 +++- sn_networking/src/replication_fetcher.rs | 29 ++++++++++++++++-------- sn_node/src/replication.rs | 4 ++-- sn_node/tests/verify_data_location.rs | 10 +++++--- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 87c18dd2c2..744c43f2bc 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -493,7 +493,10 @@ impl SwarmDriver { }; } SwarmCmd::AddLocalRecordAsStored { key, record_type } => { - trace!("Adding Record locally, for {key:?} and {record_type:?}"); + info!( + "Adding Record locally, for {:?} and {record_type:?}", + PrettyPrintRecordKey::from(&key) + ); cmd_string = "AddLocalRecordAsStored"; self.swarm .behaviour_mut() diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index 046b3d693c..a5cdbdafc5 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -231,16 +231,27 @@ impl ReplicationFetcher { // 1, the pending_entries from that node shall be removed from `to_be_fetched` list. // 2, firing event up to notify bad_nodes, hence trigger them to be removed from RT. fn prune_expired_keys_and_slow_nodes(&mut self) { - let mut failed_holders = BTreeSet::default(); + let mut failed_fetches = vec![]; + + self.on_going_fetches + .retain(|(record_key, _), (peer_id, time_out)| { + if *time_out < Instant::now() { + failed_fetches.push((record_key.clone(), *peer_id)); + false + } else { + true + } + }); - self.on_going_fetches.retain(|_, (peer_id, time_out)| { - if *time_out < Instant::now() { - failed_holders.insert(*peer_id); - false - } else { - true - } - }); + let mut failed_holders = BTreeSet::new(); + + for (record_key, peer_id) in failed_fetches { + error!( + "Failed to fetch {:?} from {peer_id:?}", + PrettyPrintRecordKey::from(&record_key) + ); + let _ = failed_holders.insert(peer_id); + } // now to clear any failed nodes from our lists. self.to_be_fetched diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 4de65952ff..d5e0c1bee9 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -112,7 +112,7 @@ impl Node { error!( "Replicating fresh record {pretty_key:?} get_record_from_store errored: {err:?}" ); - return; + None } }; @@ -124,7 +124,7 @@ impl Node { error!( "Could not get record from store for replication: {pretty_key:?} after 10 retries" ); - break; + return; } retry_count += 1; diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index e8bed364b6..64edc91138 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -270,7 +270,7 @@ async fn verify_location(all_peers: &Vec, node_rpc_addresses: &[SocketAd .for_each(|expected| failed_peers.push(*expected)); if !failed_peers.is_empty() { - failed.insert(PrettyPrintRecordKey::from(key).into_owned(), failed_peers); + failed.insert(key.clone(), failed_peers); } } @@ -279,9 +279,13 @@ async fn verify_location(all_peers: &Vec, node_rpc_addresses: &[SocketAd println!("Verification failed for {:?} entries", failed.len()); failed.iter().for_each(|(key, failed_peers)| { + let key_addr = NetworkAddress::from_record_key(key); + let pretty_key = PrettyPrintRecordKey::from(key); failed_peers.iter().for_each(|peer| { - println!("Record {key:?} is not stored inside {peer:?}"); - error!("Record {key:?} is not stored inside {peer:?}"); + let peer_addr = NetworkAddress::from_peer(*peer); + let ilog2_distance = peer_addr.distance(&key_addr).ilog2(); + println!("Record {pretty_key:?} is not stored inside {peer:?}, with ilog2 distance to be {ilog2_distance:?}"); + error!("Record {pretty_key:?} is not stored inside {peer:?}, with ilog2 distance to be {ilog2_distance:?}"); }); }); info!("State of each node:");