Skip to content

Commit

Permalink
feat(networking): clear record on valid put
Browse files Browse the repository at this point in the history
If we're able to store data agin, we clear the farthest distance in the replication fetcher.
(The assumed sitation is that the network has grown, and we're not responsible for less data).
  • Loading branch information
joshuef committed Apr 17, 2024
1 parent a24f490 commit 7df0044
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
33 changes: 22 additions & 11 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,27 @@ impl SwarmDriver {
.store_mut()
.put_verified(record, record_type.clone());

// In case the capacity reaches full, restrict replication_fetcher to
// only fetch entries not farther than the current farthest record
if let Err(StoreError::MaxRecords) = result {
let farthest = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest();
self.replication_fetcher.set_farthest_on_full(farthest);
match result {
Ok(_) => {
// We've stored the record fine, which means our replication
// fetcher should not be limited
self.replication_fetcher.clear_farthest_on_full();
}
Err(StoreError::MaxRecords) => {
// In case the capacity reaches full, restrict replication_fetcher to
// only fetch entries not farther than the current farthest record
let farthest = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest();
self.replication_fetcher.set_farthest_on_full(farthest);
}
Err(_) => {
// Nothing special to do for these errors,
// All error cases are further logged and bubbled up below
}
}

// No matter storing the record succeeded or not,
Expand Down Expand Up @@ -511,7 +522,7 @@ impl SwarmDriver {
}

if let Err(err) = result {
error!("Cann't store verified record {record_key:?} locally: {err:?}");
error!("Can't store verified record {record_key:?} locally: {err:?}");
cmd_string = "PutLocalRecord error";
self.log_handling(cmd_string.to_string(), start.elapsed());
return Err(err.into());
Expand Down
21 changes: 15 additions & 6 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ pub(crate) struct ReplicationFetcher {
event_sender: mpsc::Sender<NetworkEvent>,
/// ilog2 bucket distance range that the incoming key shall be fetched
distance_range: Option<u32>,
/// Restrict fetch range when node is full
farthest_distance: Option<Distance>,
/// Restrict fetch range to closer than this value
/// used when the node is full, but we still have "close" data coming in
/// that is _not_ closer than our farthest max record
farthest_acceptable_distance: Option<Distance>,
}

impl ReplicationFetcher {
Expand All @@ -56,7 +58,7 @@ impl ReplicationFetcher {
on_going_fetches: HashMap::new(),
event_sender,
distance_range: None,
farthest_distance: None,
farthest_acceptable_distance: None,
}
}

Expand All @@ -80,7 +82,7 @@ impl ReplicationFetcher {
let total_incoming_keys = incoming_keys.len();

// In case of node full, restrict fetch range
if let Some(farthest_distance) = self.farthest_distance {
if let Some(farthest_distance) = self.farthest_acceptable_distance {
let mut out_of_range_keys = vec![];
incoming_keys.retain(|(addr, _)| {
let is_in_range = self_address.distance(addr) <= farthest_distance;
Expand Down Expand Up @@ -151,6 +153,13 @@ impl ReplicationFetcher {
keys_to_fetch
}

// Node is storing and or pruning data fine, any fetch (ongoing or new) shall no longer be restricted
// by the fetcher itself (data is checked for being "close" at a higher level before keys are fed in
// to the ReplicationFetcher)
pub(crate) fn clear_farthest_on_full(&mut self) {
self.farthest_acceptable_distance = None;
}

// Node is full, any fetch (ongoing or new) shall no farther than the current farthest.
pub(crate) fn set_farthest_on_full(&mut self, farthest_in: Option<RecordKey>) {
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
Expand All @@ -162,7 +171,7 @@ impl ReplicationFetcher {
return;
};

if let Some(old_farthest_distance) = self.farthest_distance {
if let Some(old_farthest_distance) = self.farthest_acceptable_distance {
if new_farthest_distance >= old_farthest_distance {
return;
}
Expand All @@ -178,7 +187,7 @@ impl ReplicationFetcher {
self_addr.distance(&addr) <= new_farthest_distance
});

self.farthest_distance = Some(new_farthest_distance);
self.farthest_acceptable_distance = Some(new_farthest_distance);
}

// Notify the replication fetcher about a newly added Record to the node.
Expand Down

0 comments on commit 7df0044

Please sign in to comment.