Skip to content

Commit

Permalink
fix: Improve queue transactions handling (#4947)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Murzin <[email protected]>
  • Loading branch information
dima74 authored Aug 15, 2024
1 parent 1161bd6 commit b9f2844
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 54 deletions.
114 changes: 64 additions & 50 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Module with queue actor
use core::time::Duration;
use std::num::NonZeroUsize;
use std::{num::NonZeroUsize, ops::Deref, sync::Arc};

use crossbeam_queue::ArrayQueue;
use dashmap::{mapref::entry::Entry, DashMap};
Expand Down Expand Up @@ -82,6 +82,27 @@ pub struct Failure {
pub err: Error,
}

/// Will remove transaction from the queue on drop.
/// See [`Queue::remove_stale_transaction`] for details.
pub struct TransactionGuard {
tx: AcceptedTransaction,
queue: Arc<Queue>,
}

impl Deref for TransactionGuard {
type Target = AcceptedTransaction;

fn deref(&self) -> &Self::Target {
&self.tx
}
}

impl Drop for TransactionGuard {
fn drop(&mut self) {
self.queue.remove_stale_transaction(&self.tx);
}
}

impl Queue {
/// Makes queue from configuration
pub fn from_config(
Expand Down Expand Up @@ -238,11 +259,10 @@ impl Queue {

/// Pop single transaction from the queue. Removes all transactions that fail the `tx_check`.
fn pop_from_queue(
&self,
seen: &mut Vec<HashOf<SignedTransaction>>,
self: &Arc<Self>,
state_view: &StateView,
expired_transactions: &mut Vec<AcceptedTransaction>,
) -> Option<AcceptedTransaction> {
) -> Option<TransactionGuard> {
loop {
let hash = self.tx_hashes.pop()?;

Expand All @@ -267,8 +287,11 @@ impl Queue {
continue;
}

seen.push(hash);
return Some(tx.clone());
let guard = TransactionGuard {
tx: tx.clone(),
queue: Arc::clone(self),
};
return Some(guard);
}
}

Expand All @@ -282,10 +305,10 @@ impl Queue {
/// BEWARE: Shouldn't be called in parallel with itself.
#[cfg(test)]
fn collect_transactions_for_block(
&self,
self: &Arc<Self>,
state_view: &StateView,
max_txs_in_block: NonZeroUsize,
) -> Vec<AcceptedTransaction> {
) -> Vec<TransactionGuard> {
let mut transactions = Vec::with_capacity(max_txs_in_block.get());
self.get_transactions_for_block(state_view, max_txs_in_block, &mut transactions);
transactions
Expand All @@ -295,21 +318,19 @@ impl Queue {
///
/// BEWARE: Shouldn't be called in parallel with itself.
pub fn get_transactions_for_block(
&self,
self: &Arc<Self>,
state_view: &StateView,
max_txs_in_block: NonZeroUsize,
transactions: &mut Vec<AcceptedTransaction>,
transactions: &mut Vec<TransactionGuard>,
) {
if transactions.len() >= max_txs_in_block.get() {
return;
}

let mut seen_queue = Vec::new();
let mut expired_transactions = Vec::new();

let txs_from_queue = core::iter::from_fn(|| {
self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions)
});
let txs_from_queue =
core::iter::from_fn(|| self.pop_from_queue(state_view, &mut expired_transactions));

let transactions_hashes: IndexSet<HashOf<SignedTransaction>> =
transactions.iter().map(|tx| tx.as_ref().hash()).collect();
Expand All @@ -318,11 +339,6 @@ impl Queue {
.take(max_txs_in_block.get() - transactions.len());
transactions.extend(txs);

seen_queue
.into_iter()
.try_for_each(|hash| self.tx_hashes.push(hash))
.expect("Exceeded the number of transactions pending");

expired_transactions
.into_iter()
.map(|tx| TransactionEvent {
Expand All @@ -335,6 +351,30 @@ impl Queue {
});
}

/// Overview:
/// 1. Transaction is added to queue using [`Queue::push`] method.
/// 2. Transaction is moved to [`Sumeragi::transaction_cache`] using [`Queue::pop_from_queue`] method.
/// Note that transaction is removed from [`Queue::tx_hashes`], but kept in [`Queue::accepted_tx`],
/// this is needed to return `Error::IsInQueue` when adding same transaction twice.
/// 3. When transaction is removed from [`Sumeragi::transaction_cache`]
/// (either because it was expired, or because transaction is commited to blockchain),
/// we should remove transaction from [`Queue::accepted_tx`].
fn remove_stale_transaction(&self, tx: &AcceptedTransaction) {
let removed = self.accepted_txs.remove(&tx.as_ref().hash());
if removed.is_some() {
self.decrease_per_user_tx_count(tx.as_ref().authority());

if self.is_expired(tx) {
let event = TransactionEvent {
hash: tx.as_ref().hash(),
block_height: None,
status: TransactionStatus::Expired,
};
let _ = self.events_sender.send(event.into());
}
}
}

/// Check that the user adhered to the maximum transaction per user limit and increment their transaction count.
fn check_and_increase_per_user_tx_count(&self, account_id: &AccountId) -> Result<(), Error> {
match self.txs_per_user.entry(account_id.clone()) {
Expand Down Expand Up @@ -518,6 +558,7 @@ pub mod tests {
},
&time_source,
);
let queue = Arc::new(queue);
for _ in 0..5 {
queue
.push(accepted_tx_by_someone(&time_source), &state_view)
Expand Down Expand Up @@ -562,6 +603,7 @@ pub mod tests {
let (_time_handle, time_source) = TimeSource::new_mock(Duration::default());
let tx = accepted_tx_by_someone(&time_source);
let queue = Queue::test(config_factory(), &time_source);
let queue = Arc::new(queue);
queue.push(tx.clone(), &state.view()).unwrap();
let mut state_block = state.block();
state_block
Expand Down Expand Up @@ -594,6 +636,7 @@ pub mod tests {
},
&time_source,
);
let queue = Arc::new(queue);
for _ in 0..(max_txs_in_block.get() - 1) {
queue
.push(accepted_tx_by_someone(&time_source), &state_view)
Expand Down Expand Up @@ -624,37 +667,6 @@ pub mod tests {
);
}

// Queue should only drop transactions which are already committed or ttl expired.
// Others should stay in the queue until that moment.
#[test]
async fn transactions_available_after_pop() {
let max_txs_in_block = nonzero!(2_usize);
let kura = Kura::blank_kura_for_testing();
let query_handle = LiveQueryStore::test().start();
let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle));
let state_view = state.view();

let (_time_handle, time_source) = TimeSource::new_mock(Duration::default());

let queue = Queue::test(config_factory(), &time_source);
queue
.push(accepted_tx_by_someone(&time_source), &state_view)
.expect("Failed to push tx into queue");

let a = queue
.collect_transactions_for_block(&state_view, max_txs_in_block)
.into_iter()
.map(|tx| tx.as_ref().hash())
.collect::<Vec<_>>();
let b = queue
.collect_transactions_for_block(&state_view, max_txs_in_block)
.into_iter()
.map(|tx| tx.as_ref().hash())
.collect::<Vec<_>>();
assert_eq!(a.len(), 1);
assert_eq!(a, b);
}

#[test]
async fn custom_expired_transaction_is_rejected() {
const TTL_MS: u64 = 200;
Expand Down Expand Up @@ -703,6 +715,7 @@ pub mod tests {

let mut txs = Vec::new();
time_handle.advance(Duration::from_millis(TTL_MS + 1));
let queue = Arc::new(queue);
queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs);
let expired_tx_event = event_receiver.recv().await.unwrap();
assert!(txs.is_empty());
Expand Down Expand Up @@ -855,6 +868,7 @@ pub mod tests {
},
&time_source,
);
let queue = Arc::new(queue);

// First push by Alice should be fine
queue
Expand Down
12 changes: 8 additions & 4 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! The main event loop that powers sumeragi.
use std::{collections::BTreeSet, sync::mpsc};
use std::{collections::BTreeSet, ops::Deref, sync::mpsc};

use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId};
use iroha_p2p::UpdateTopology;
use tracing::{span, Level};

use super::{view_change::ProofBuilder, *};
use crate::{block::*, sumeragi::tracing::instrument};
use crate::{block::*, queue::TransactionGuard, sumeragi::tracing::instrument};

/// `Sumeragi` is the implementation of the consensus.
pub struct Sumeragi {
Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct Sumeragi {
/// other subsystems where we can. This way the performance of
/// sumeragi is more dependent on the code that is internal to the
/// subsystem.
pub transaction_cache: Vec<AcceptedTransaction>,
pub transaction_cache: Vec<TransactionGuard>,
/// Metrics for reporting number of view changes in current round
pub view_changes_metric: iroha_telemetry::metrics::ViewChangesGauge,

Expand Down Expand Up @@ -842,7 +842,11 @@ impl Sumeragi {
let tx_cache_non_empty = !self.transaction_cache.is_empty();

if tx_cache_full || (deadline_reached && tx_cache_non_empty) {
let transactions = self.transaction_cache.clone();
let transactions = self
.transaction_cache
.iter()
.map(|tx| tx.deref().clone())
.collect::<Vec<_>>();

let mut state_block = state.block();
let create_block_start_time = Instant::now();
Expand Down
Binary file modified defaults/executor.wasm
Binary file not shown.

0 comments on commit b9f2844

Please sign in to comment.