Skip to content

Commit

Permalink
fix(node): not send out replication when failed read from local
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and joshuef committed Apr 11, 2024
1 parent 4658048 commit 5c67079
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
5 changes: 4 additions & 1 deletion sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ impl SwarmDriver {
};
}
SwarmCmd::AddLocalRecordAsStored { key, record_type } => {
trace!("Adding Record locally, for {key:?} and {record_type:?}");
info!(
"Adding Record locally, for {:?} and {record_type:?}",
PrettyPrintRecordKey::from(&key)
);
cmd_string = "AddLocalRecordAsStored";
self.swarm
.behaviour_mut()
Expand Down
29 changes: 20 additions & 9 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,27 @@ impl ReplicationFetcher {
// 1, the pending_entries from that node shall be removed from `to_be_fetched` list.
// 2, firing event up to notify bad_nodes, hence trigger them to be removed from RT.
fn prune_expired_keys_and_slow_nodes(&mut self) {
let mut failed_holders = BTreeSet::default();
let mut failed_fetches = vec![];

self.on_going_fetches
.retain(|(record_key, _), (peer_id, time_out)| {
if *time_out < Instant::now() {
failed_fetches.push((record_key.clone(), *peer_id));
false
} else {
true
}
});

self.on_going_fetches.retain(|_, (peer_id, time_out)| {
if *time_out < Instant::now() {
failed_holders.insert(*peer_id);
false
} else {
true
}
});
let mut failed_holders = BTreeSet::new();

for (record_key, peer_id) in failed_fetches {
error!(
"Failed to fetch {:?} from {peer_id:?}",
PrettyPrintRecordKey::from(&record_key)
);
let _ = failed_holders.insert(peer_id);
}

// now to clear any failed nodes from our lists.
self.to_be_fetched
Expand Down
4 changes: 2 additions & 2 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Node {
error!(
"Replicating fresh record {pretty_key:?} get_record_from_store errored: {err:?}"
);
return;
None
}
};

Expand All @@ -124,7 +124,7 @@ impl Node {
error!(
"Could not get record from store for replication: {pretty_key:?} after 10 retries"
);
break;
return;
}

retry_count += 1;
Expand Down
10 changes: 7 additions & 3 deletions sn_node/tests/verify_data_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn verify_location(all_peers: &Vec<PeerId>, node_rpc_addresses: &[SocketAd
.for_each(|expected| failed_peers.push(*expected));

if !failed_peers.is_empty() {
failed.insert(PrettyPrintRecordKey::from(key).into_owned(), failed_peers);
failed.insert(key.clone(), failed_peers);
}
}

Expand All @@ -279,9 +279,13 @@ async fn verify_location(all_peers: &Vec<PeerId>, node_rpc_addresses: &[SocketAd
println!("Verification failed for {:?} entries", failed.len());

failed.iter().for_each(|(key, failed_peers)| {
let key_addr = NetworkAddress::from_record_key(key);
let pretty_key = PrettyPrintRecordKey::from(key);
failed_peers.iter().for_each(|peer| {
println!("Record {key:?} is not stored inside {peer:?}");
error!("Record {key:?} is not stored inside {peer:?}");
let peer_addr = NetworkAddress::from_peer(*peer);
let ilog2_distance = peer_addr.distance(&key_addr).ilog2();
println!("Record {pretty_key:?} is not stored inside {peer:?}, with ilog2 distance to be {ilog2_distance:?}");
error!("Record {pretty_key:?} is not stored inside {peer:?}, with ilog2 distance to be {ilog2_distance:?}");
});
});
info!("State of each node:");
Expand Down

0 comments on commit 5c67079

Please sign in to comment.