Skip to content

Commit

Permalink
refactor(gossiper): gossip txs at most once (#5079)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara authored Sep 23, 2024
1 parent 8580c57 commit 99fda1f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion crates/iroha_core/src/gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl TransactionGossiper {
fn gossip_transactions(&self) {
let txs = self
.queue
.n_random_transactions(self.gossip_size.get(), &self.state.view());
.gossip_batch(self.gossip_size.get(), &self.state.view());

if txs.is_empty() {
return;
Expand Down
38 changes: 23 additions & 15 deletions crates/iroha_core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use iroha_data_model::{
};
use iroha_logger::{trace, warn};
use iroha_primitives::time::TimeSource;
use rand::seq::IteratorRandom;
use thiserror::Error;

use crate::{prelude::*, EventsSender};
Expand Down Expand Up @@ -50,6 +49,8 @@ pub struct Queue {
time_source: TimeSource,
/// Length of time after which transactions are dropped.
pub tx_time_to_live: Duration,
/// Queue to gossip transactions
tx_gossip: ArrayQueue<HashOf<SignedTransaction>>,
}

/// Queue push error
Expand Down Expand Up @@ -117,6 +118,7 @@ impl Queue {
capacity_per_user,
time_source: TimeSource::new_system(),
tx_time_to_live: transaction_time_to_live,
tx_gossip: ArrayQueue::new(capacity.get()),
}
}

Expand Down Expand Up @@ -151,20 +153,22 @@ impl Queue {
})
}

/// Returns `n` randomly selected transaction from the queue.
pub fn n_random_transactions(
&self,
n: u32,
state_view: &StateView,
) -> Vec<AcceptedTransaction> {
self.txs
.iter()
.filter(|e| self.is_pending(e.value(), state_view))
.map(|e| e.value().clone())
.choose_multiple(
&mut rand::thread_rng(),
n.try_into().expect("u32 should always fit in usize"),
)
/// Returns `n` transactions in a batch for gossiping
pub fn gossip_batch(&self, n: u32, state_view: &StateView) -> Vec<AcceptedTransaction> {
let mut batch = Vec::with_capacity(n as usize);
while let Some(hash) = self.tx_gossip.pop() {
let Some(tx) = self.txs.get(&hash) else {
// NOTE: Transaction already in the blockchain
continue;
};
if self.is_pending(&tx, state_view) {
batch.push(tx.value().clone());
if batch.len() >= n as usize {
break;
}
}
}
batch
}

fn check_tx(&self, tx: &AcceptedTransaction, state_view: &StateView) -> Result<(), Error> {
Expand Down Expand Up @@ -230,6 +234,9 @@ impl Queue {
err: Error::Full,
}
})?;
if let Err(err_hash) = self.tx_gossip.push(hash) {
warn!(tx=%err_hash, "Gossiper is lagging behind, not able to queue tx for gossiping");
}
let _ = self.events_sender.send(
TransactionEvent {
hash,
Expand Down Expand Up @@ -421,6 +428,7 @@ pub mod tests {
Self {
events_sender: tokio::sync::broadcast::Sender::new(1),
tx_hashes: ArrayQueue::new(cfg.capacity.get()),
tx_gossip: ArrayQueue::new(cfg.capacity.get()),
txs: DashMap::new(),
txs_per_user: DashMap::new(),
capacity: cfg.capacity,
Expand Down

0 comments on commit 99fda1f

Please sign in to comment.