diff --git a/lib/src/extras/rpc_server.rs b/lib/src/extras/rpc_server.rs index 8e8af3fdeb..01331db6c8 100644 --- a/lib/src/extras/rpc_server.rs +++ b/lib/src/extras/rpc_server.rs @@ -49,7 +49,7 @@ pub fn initialize_rpc_server( )); dispatcher.add(NetworkDispatcher::new(client.network())); if let Some(mempool) = client.mempool() { - dispatcher.add(MempoolDispatcher::new(mempool)); + dispatcher.add(MempoolDispatcher::new(client.consensus_proxy(), mempool)); } dispatcher.add(PolicyDispatcher {}); if let Some(validator_proxy) = client.validator_proxy() { diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index b91a52684c..4029e502b5 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -612,7 +612,7 @@ impl Mempool { (txs, size) } - /// Adds a transaction to the Mempool. + /// Adds a transaction to the local mempool without broadcasting it over the network. pub fn add_transaction( &self, transaction: Transaction, diff --git a/rpc-interface/src/mempool.rs b/rpc-interface/src/mempool.rs index a38ac612bb..7650977be8 100644 --- a/rpc-interface/src/mempool.rs +++ b/rpc-interface/src/mempool.rs @@ -9,11 +9,11 @@ use crate::types::{HashOrTx, MempoolInfo, RPCResult}; pub trait MempoolInterface { type Error; - /// Pushes a raw transaction into the mempool, it will be assigned a default priority. + /// Pushes a raw transaction with a default priority assigned into the mempool and broadcast it to the network. async fn push_transaction(&mut self, raw_tx: String) -> RPCResult; - /// Pushes a raw transaction into the mempool with high priority. + /// Pushes a raw transaction with a high priority assigned into the mempool and broadcast it to the network. async fn push_high_priority_transaction( &mut self, raw_tx: String, diff --git a/rpc-server/src/dispatchers/mempool.rs b/rpc-server/src/dispatchers/mempool.rs index 955894a1ff..bfbe3d6f44 100644 --- a/rpc-server/src/dispatchers/mempool.rs +++ b/rpc-server/src/dispatchers/mempool.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use async_trait::async_trait; +use nimiq_consensus::ConsensusProxy; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_mempool::{mempool::Mempool, mempool_transactions::TxPriority}; +use nimiq_network_libp2p::Network; use nimiq_rpc_interface::{ mempool::MempoolInterface, types::{HashOrTx, MempoolInfo, RPCResult}, @@ -14,12 +16,13 @@ use crate::error::Error; #[allow(dead_code)] pub struct MempoolDispatcher { + consensus: ConsensusProxy, mempool: Arc, } impl MempoolDispatcher { - pub fn new(mempool: Arc) -> Self { - MempoolDispatcher { mempool } + pub fn new(consensus: ConsensusProxy, mempool: Arc) -> Self { + MempoolDispatcher { consensus, mempool } } } @@ -35,8 +38,13 @@ impl MempoolInterface for MempoolDispatcher { let tx = Transaction::deserialize_from_vec(&hex::decode(&raw_tx)?)?; let txid = tx.hash::(); - match self.mempool.add_transaction(tx, None) { - Ok(_) => Ok(txid.into()), + match self.mempool.add_transaction(tx.clone(), None) { + Ok(_) => self + .consensus + .send_transaction(tx) + .await + .map(|_| txid.into()) + .map_err(Error::NetworkError), Err(e) => Err(Error::MempoolError(e)), } } @@ -48,8 +56,16 @@ impl MempoolInterface for MempoolDispatcher { let tx = Transaction::deserialize_from_vec(&hex::decode(&raw_tx)?)?; let txid = tx.hash::(); - match self.mempool.add_transaction(tx, Some(TxPriority::High)) { - Ok(_) => Ok(txid.into()), + match self + .mempool + .add_transaction(tx.clone(), Some(TxPriority::High)) + { + Ok(_) => self + .consensus + .send_transaction(tx) + .await + .map(|_| txid.into()) + .map_err(Error::NetworkError), Err(e) => Err(Error::MempoolError(e)), } }