From 9d57535244cae65abd5ed30a8473da0b0fb3d412 Mon Sep 17 00:00:00 2001 From: qima Date: Sat, 13 Apr 2024 00:34:35 +0800 Subject: [PATCH] fix(node): notify replication_fetcher of early completion --- sn_networking/src/cmd.rs | 20 ++++++++++++++++++++ sn_networking/src/lib.rs | 6 ++++++ sn_networking/src/replication_fetcher.rs | 12 ++++++++++++ sn_node/src/put_validation.rs | 5 +++++ 4 files changed, 43 insertions(+) diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 744c43f2bc..147603f560 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -177,6 +177,8 @@ pub enum SwarmCmd { QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)>, }, + // Notify a fetch completion + FetchCompleted(RecordKey), } /// Debug impl for SwarmCmd to avoid printing full Record, instead only RecodKey @@ -297,6 +299,13 @@ impl Debug for SwarmCmd { SwarmCmd::QuoteVerification { quotes } => { write!(f, "SwarmCmd::QuoteVerification of {} quotes", quotes.len()) } + SwarmCmd::FetchCompleted(key) => { + write!( + f, + "SwarmCmd::FetchCompleted({:?})", + PrettyPrintRecordKey::from(key) + ) + } } } } @@ -727,6 +736,17 @@ impl SwarmDriver { self.verify_peer_quote(peer_id, quote); } } + SwarmCmd::FetchCompleted(key) => { + info!( + "Fetch {:?} early completed, may fetched an old version record.", + PrettyPrintRecordKey::from(&key) + ); + cmd_string = "FetchCompleted"; + let new_keys_to_fetch = self.replication_fetcher.notify_fetch_early_completed(key); + if !new_keys_to_fetch.is_empty() { + self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch)); + } + } } self.log_handling(cmd_string.to_string(), start.elapsed()); diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 62046b2484..0851efd682 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -681,6 +681,12 @@ impl Network { response } + /// Notify ReplicationFetch a fetch attempt is completed. + /// (but it won't trigger any real writes to disk, say fetched an old version of register) + pub fn notify_fetch_completed(&self, key: RecordKey) { + self.send_swarm_cmd(SwarmCmd::FetchCompleted(key)) + } + /// Put `Record` to the local RecordStore /// Must be called after the validations are performed on the Record pub fn put_local_record(&self, record: Record) { diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index e21f56c635..c26f1d2845 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -152,6 +152,18 @@ impl ReplicationFetcher { self.next_keys_to_fetch() } + // An early completion of a fetch means the target is an old version record (Register or Spend). + pub(crate) fn notify_fetch_early_completed( + &mut self, + key_in: RecordKey, + ) -> Vec<(PeerId, RecordKey)> { + self.to_be_fetched.retain(|(key, _t, _), _| key != &key_in); + + self.on_going_fetches.retain(|(key, _t), _| key != &key_in); + + self.next_keys_to_fetch() + } + // Returns the set of keys that has to be fetched from the peer/network. // Target must not be under-fetching // and no more than MAX_PARALLEL_FETCH fetches to be undertaken at the same time. diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index d0c07a82b3..9dcc5e459b 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -89,6 +89,9 @@ impl Node { record_key, RecordType::NonChunk(content_hash), ); + } else { + // Notify replication_fetcher to mark the attempt as completed. + self.network.notify_fetch_completed(record.key.clone()); } result } @@ -289,6 +292,8 @@ impl Node { let updated_register = match self.register_validation(®ister, present_locally).await? { Some(reg) => reg, None => { + // Notify replication_fetcher to mark the attempt as completed. + self.network.notify_fetch_completed(key.clone()); return Ok(CmdOk::DataAlreadyPresent); } };