diff --git a/crates/iroha_core/src/gossiper.rs b/crates/iroha_core/src/gossiper.rs index a821a3d8d61..716486fa1ca 100644 --- a/crates/iroha_core/src/gossiper.rs +++ b/crates/iroha_core/src/gossiper.rs @@ -103,7 +103,7 @@ impl TransactionGossiper { fn gossip_transactions(&self) { let txs = self .queue - .n_random_transactions(self.gossip_size.get(), &self.state.view()); + .gossip_batch(self.gossip_size.get(), &self.state.view()); if txs.is_empty() { return; diff --git a/crates/iroha_core/src/queue.rs b/crates/iroha_core/src/queue.rs index f9a9bf3c330..ac23eb2079d 100644 --- a/crates/iroha_core/src/queue.rs +++ b/crates/iroha_core/src/queue.rs @@ -15,7 +15,6 @@ use iroha_data_model::{ }; use iroha_logger::{trace, warn}; use iroha_primitives::time::TimeSource; -use rand::seq::IteratorRandom; use thiserror::Error; use crate::{prelude::*, EventsSender}; @@ -50,6 +49,8 @@ pub struct Queue { time_source: TimeSource, /// Length of time after which transactions are dropped. pub tx_time_to_live: Duration, + /// Queue to gossip transactions + tx_gossip: ArrayQueue>, } /// Queue push error @@ -117,6 +118,7 @@ impl Queue { capacity_per_user, time_source: TimeSource::new_system(), tx_time_to_live: transaction_time_to_live, + tx_gossip: ArrayQueue::new(capacity.get()), } } @@ -151,20 +153,22 @@ impl Queue { }) } - /// Returns `n` randomly selected transaction from the queue. - pub fn n_random_transactions( - &self, - n: u32, - state_view: &StateView, - ) -> Vec { - self.txs - .iter() - .filter(|e| self.is_pending(e.value(), state_view)) - .map(|e| e.value().clone()) - .choose_multiple( - &mut rand::thread_rng(), - n.try_into().expect("u32 should always fit in usize"), - ) + /// Returns `n` transactions in a batch for gossiping + pub fn gossip_batch(&self, n: u32, state_view: &StateView) -> Vec { + let mut batch = Vec::with_capacity(n as usize); + while let Some(hash) = self.tx_gossip.pop() { + let Some(tx) = self.txs.get(&hash) else { + // NOTE: Transaction already in the blockchain + continue; + }; + if self.is_pending(&tx, state_view) { + batch.push(tx.value().clone()); + if batch.len() >= n as usize { + break; + } + } + } + batch } fn check_tx(&self, tx: &AcceptedTransaction, state_view: &StateView) -> Result<(), Error> { @@ -230,6 +234,9 @@ impl Queue { err: Error::Full, } })?; + if let Err(err_hash) = self.tx_gossip.push(hash) { + warn!(tx=%err_hash, "Gossiper is lagging behind, not able to queue tx for gossiping"); + } let _ = self.events_sender.send( TransactionEvent { hash, @@ -421,6 +428,7 @@ pub mod tests { Self { events_sender: tokio::sync::broadcast::Sender::new(1), tx_hashes: ArrayQueue::new(cfg.capacity.get()), + tx_gossip: ArrayQueue::new(cfg.capacity.get()), txs: DashMap::new(), txs_per_user: DashMap::new(), capacity: cfg.capacity,