Skip to content

Commit

Permalink
fix(node): notify replication_fetcher of early completion
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and joshuef committed Apr 14, 2024
1 parent 3626090 commit 9d57535
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 0 deletions.
20 changes: 20 additions & 0 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub enum SwarmCmd {
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
// Notify a fetch completion
FetchCompleted(RecordKey),
}

/// Debug impl for SwarmCmd to avoid printing full Record, instead only RecodKey
Expand Down Expand Up @@ -297,6 +299,13 @@ impl Debug for SwarmCmd {
SwarmCmd::QuoteVerification { quotes } => {
write!(f, "SwarmCmd::QuoteVerification of {} quotes", quotes.len())
}
SwarmCmd::FetchCompleted(key) => {
write!(
f,
"SwarmCmd::FetchCompleted({:?})",
PrettyPrintRecordKey::from(key)
)
}
}
}
}
Expand Down Expand Up @@ -727,6 +736,17 @@ impl SwarmDriver {
self.verify_peer_quote(peer_id, quote);
}
}
SwarmCmd::FetchCompleted(key) => {
info!(
"Fetch {:?} early completed, may fetched an old version record.",
PrettyPrintRecordKey::from(&key)
);
cmd_string = "FetchCompleted";
let new_keys_to_fetch = self.replication_fetcher.notify_fetch_early_completed(key);
if !new_keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
}
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down
6 changes: 6 additions & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,12 @@ impl Network {
response
}

/// Notify ReplicationFetch a fetch attempt is completed.
/// (but it won't trigger any real writes to disk, say fetched an old version of register)
pub fn notify_fetch_completed(&self, key: RecordKey) {
self.send_swarm_cmd(SwarmCmd::FetchCompleted(key))
}

/// Put `Record` to the local RecordStore
/// Must be called after the validations are performed on the Record
pub fn put_local_record(&self, record: Record) {
Expand Down
12 changes: 12 additions & 0 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ impl ReplicationFetcher {
self.next_keys_to_fetch()
}

// An early completion of a fetch means the target is an old version record (Register or Spend).
pub(crate) fn notify_fetch_early_completed(
&mut self,
key_in: RecordKey,
) -> Vec<(PeerId, RecordKey)> {
self.to_be_fetched.retain(|(key, _t, _), _| key != &key_in);

self.on_going_fetches.retain(|(key, _t), _| key != &key_in);

self.next_keys_to_fetch()
}

// Returns the set of keys that has to be fetched from the peer/network.
// Target must not be under-fetching
// and no more than MAX_PARALLEL_FETCH fetches to be undertaken at the same time.
Expand Down
5 changes: 5 additions & 0 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ impl Node {
record_key,
RecordType::NonChunk(content_hash),
);
} else {
// Notify replication_fetcher to mark the attempt as completed.
self.network.notify_fetch_completed(record.key.clone());
}
result
}
Expand Down Expand Up @@ -289,6 +292,8 @@ impl Node {
let updated_register = match self.register_validation(&register, present_locally).await? {
Some(reg) => reg,
None => {
// Notify replication_fetcher to mark the attempt as completed.
self.network.notify_fetch_completed(key.clone());
return Ok(CmdOk::DataAlreadyPresent);
}
};
Expand Down

0 comments on commit 9d57535

Please sign in to comment.