Skip to content

Commit

Permalink
Merge pull request #5782 from knst/fix-circular-net-processing
Browse files Browse the repository at this point in the history
refactor: remove circular dependencies through net_processing (1/N)
  • Loading branch information
PastaPastaPasta authored Jan 10, 2024
2 parents d2a8946 + a9401cc commit 95fad52
Show file tree
Hide file tree
Showing 28 changed files with 2,685 additions and 245 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ BITCOIN_CORE_H = \
util/enumerate.h \
util/epochguard.h \
util/error.h \
util/expected.h \
util/fastrange.h \
util/fees.h \
util/golombrice.h \
Expand Down
41 changes: 20 additions & 21 deletions src/coinjoin/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <evo/deterministicmns.h>
#include <masternode/meta.h>
#include <masternode/sync.h>
#include <net_processing.h>
#include <net.h>
#include <netmessagemaker.h>
#include <shutdown.h>
#include <util/check.h>
Expand All @@ -29,33 +29,32 @@
#include <memory>
#include <univalue.h>

void CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv)
PeerMsgRet CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv)
{
if (fMasternodeMode) return;
if (!m_mn_sync.IsBlockchainSynced()) return;
if (fMasternodeMode) return {};
if (!m_mn_sync.IsBlockchainSynced()) return {};

if (msg_type == NetMsgType::DSQUEUE) {
CCoinJoinClientQueueManager::ProcessDSQueue(peer, peerman, vRecv);
return CCoinJoinClientQueueManager::ProcessDSQueue(peer, vRecv);
}
return {};
}

void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager& peerman, CDataStream& vRecv)
PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataStream& vRecv)
{
CCoinJoinQueue dsq;
vRecv >> dsq;

if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
peerman.Misbehaving(peer.GetId(), 100);
return;
return tl::unexpected{100};
}

if (dsq.masternodeOutpoint.IsNull()) {
auto mnList = deterministicMNManager->GetListAtChainTip();
if (auto dmn = mnList.GetValidMN(dsq.m_protxHash)) {
dsq.masternodeOutpoint = dmn->collateralOutpoint;
} else {
peerman.Misbehaving(peer.GetId(), 10);
return;
return tl::unexpected{10};
}
}

Expand All @@ -67,33 +66,32 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
// process every dsq only once
for (const auto &q: vecCoinJoinQueue) {
if (q == dsq) {
return;
return {};
}
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::COINJOIN, /* Continued */
"DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n",
peer.GetLogString(), dsq.masternodeOutpoint.ToStringShort());
return;
return {};
}
}
} // cs_vecqueue

LogPrint(BCLog::COINJOIN, "DSQUEUE -- %s new\n", dsq.ToString());

if (dsq.IsTimeOutOfBounds()) return;
if (dsq.IsTimeOutOfBounds()) return {};

auto mnList = deterministicMNManager->GetListAtChainTip();
auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint);
if (!dmn) return;
if (!dmn) return {};

if (dsq.m_protxHash.IsNull()) {
dsq.m_protxHash = dmn->proTxHash;
}

if (!dsq.CheckSignature(dmn->pdmnState->pubKeyOperator.Get())) {
peerman.Misbehaving(peer.GetId(), 10);
return;
return tl::unexpected{10};
}

// if the queue is ready, submit if we can
Expand All @@ -104,7 +102,7 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
})) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- CoinJoin queue (%s) is ready on masternode %s\n", dsq.ToString(),
dmn->pdmnState->addr.ToString());
return;
return {};
} else {
int64_t nLastDsq = mmetaman->GetMetaInfo(dmn->proTxHash)->GetLastDsq();
int64_t nDsqThreshold = mmetaman->GetDsqThreshold(dmn->proTxHash, mnList.GetValidMNsCount());
Expand All @@ -114,7 +112,7 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
if (nLastDsq != 0 && nDsqThreshold > mmetaman->GetDsqCount()) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Masternode %s is sending too many dsq messages\n",
dmn->proTxHash.ToString());
return;
return {};
}

mmetaman->AllowMixing(dmn->proTxHash);
Expand All @@ -129,9 +127,10 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
}
} // cs_ProcessDSQueue
dsq.Relay(connman);
return {};
}

void CCoinJoinClientManager::ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv)
void CCoinJoinClientManager::ProcessMessage(CNode& peer, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv)
{
if (fMasternodeMode) return;
if (!CCoinJoinClientOptions::IsEnabled()) return;
Expand All @@ -150,7 +149,7 @@ void CCoinJoinClientManager::ProcessMessage(CNode& peer, PeerManager& peerman, C
AssertLockNotHeld(cs_deqsessions);
LOCK(cs_deqsessions);
for (auto& session : deqSessions) {
session.ProcessMessage(peer, peerman, connman, mempool, msg_type, vRecv);
session.ProcessMessage(peer, connman, mempool, msg_type, vRecv);
}
}
}
Expand All @@ -164,7 +163,7 @@ CCoinJoinClientSession::CCoinJoinClientSession(CWallet& wallet, CoinJoinWalletMa
m_queueman(queueman)
{}

void CCoinJoinClientSession::ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv)
void CCoinJoinClientSession::ProcessMessage(CNode& peer, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv)
{
if (fMasternodeMode) return;
if (!CCoinJoinClientOptions::IsEnabled()) return;
Expand Down
11 changes: 6 additions & 5 deletions src/coinjoin/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <coinjoin/util.h>
#include <coinjoin/coinjoin.h>

#include <net_types.h>
#include <util/translation.h>

#include <atomic>
Expand All @@ -23,7 +25,6 @@ class CoinJoinWalletManager;
class CNode;
class CMasternodeSync;
class CTxMemPool;
class PeerManager;

class UniValue;

Expand Down Expand Up @@ -159,7 +160,7 @@ class CCoinJoinClientSession : public CCoinJoinBaseSession
explicit CCoinJoinClientSession(CWallet& wallet, CoinJoinWalletManager& walletman, const CMasternodeSync& mn_sync,
const std::unique_ptr<CCoinJoinClientQueueManager>& queueman);

void ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv);
void ProcessMessage(CNode& peer, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv);

void UnlockCoins();

Expand Down Expand Up @@ -196,8 +197,8 @@ class CCoinJoinClientQueueManager : public CCoinJoinBaseManager
explicit CCoinJoinClientQueueManager(CConnman& _connman, CoinJoinWalletManager& walletman, const CMasternodeSync& mn_sync) :
connman(_connman), m_walletman(walletman), m_mn_sync(mn_sync) {};

void ProcessMessage(const CNode& peer, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv) LOCKS_EXCLUDED(cs_vecqueue);
void ProcessDSQueue(const CNode& peer, PeerManager& peerman, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) LOCKS_EXCLUDED(cs_vecqueue);
PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv);
void DoMaintenance();
};

Expand Down Expand Up @@ -245,7 +246,7 @@ class CCoinJoinClientManager
const std::unique_ptr<CCoinJoinClientQueueManager>& queueman) :
m_wallet(wallet), m_walletman(walletman), m_mn_sync(mn_sync), m_queueman(queueman) {}

void ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv) LOCKS_EXCLUDED(cs_deqsessions);
void ProcessMessage(CNode& peer, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv) LOCKS_EXCLUDED(cs_deqsessions);

bool StartMixing();
void StopMixing();
Expand Down
38 changes: 19 additions & 19 deletions src/coinjoin/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
#include <masternode/meta.h>
#include <masternode/node.h>
#include <masternode/sync.h>
#include <net_processing.h>
#include <net.h>
#include <netmessagemaker.h>
#include <script/interpreter.h>
#include <shutdown.h>
#include <streams.h>
#include <txmempool.h>
#include <util/moneystr.h>
#include <util/ranges.h>
Expand All @@ -25,20 +26,21 @@

constexpr static CAmount DEFAULT_MAX_RAW_TX_FEE{COIN / 10};

void CCoinJoinServer::ProcessMessage(CNode& peer, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv)
PeerMsgRet CCoinJoinServer::ProcessMessage(CNode& peer, std::string_view msg_type, CDataStream& vRecv)
{
if (!fMasternodeMode) return;
if (!m_mn_sync.IsBlockchainSynced()) return;
if (!fMasternodeMode) return {};
if (!m_mn_sync.IsBlockchainSynced()) return {};

if (msg_type == NetMsgType::DSACCEPT) {
ProcessDSACCEPT(peer, vRecv);
} else if (msg_type == NetMsgType::DSQUEUE) {
ProcessDSQUEUE(peer, peerman, vRecv);
return ProcessDSQUEUE(peer, vRecv);
} else if (msg_type == NetMsgType::DSVIN) {
ProcessDSVIN(peer, vRecv);
} else if (msg_type == NetMsgType::DSSIGNFINALTX) {
ProcessDSSIGNFINALTX(vRecv);
}
return {};
}

void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
Expand Down Expand Up @@ -106,58 +108,55 @@ void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
}
}

void CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, PeerManager& peerman, CDataStream& vRecv)
PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv)
{
CCoinJoinQueue dsq;
vRecv >> dsq;

if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
peerman.Misbehaving(peer.GetId(), 100);
return;
return tl::unexpected{100};
}

if (dsq.masternodeOutpoint.IsNull()) {
auto mnList = deterministicMNManager->GetListAtChainTip();
if (auto dmn = mnList.GetValidMN(dsq.m_protxHash)) {
dsq.masternodeOutpoint = dmn->collateralOutpoint;
} else {
peerman.Misbehaving(peer.GetId(), 10);
return;
return tl::unexpected{10};
}
}

{
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;
if (!lockRecv) return {};

// process every dsq only once
for (const auto& q : vecCoinJoinQueue) {
if (q == dsq) {
return;
return {};
}
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", peer.GetLogString(), dsq.masternodeOutpoint.ToStringShort());
return;
return {};
}
}
} // cs_vecqueue

LogPrint(BCLog::COINJOIN, "DSQUEUE -- %s new\n", dsq.ToString());

if (dsq.IsTimeOutOfBounds()) return;
if (dsq.IsTimeOutOfBounds()) return {};

auto mnList = deterministicMNManager->GetListAtChainTip();
auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint);
if (!dmn) return;
if (!dmn) return {};

if (dsq.m_protxHash.IsNull()) {
dsq.m_protxHash = dmn->proTxHash;
}

if (!dsq.CheckSignature(dmn->pdmnState->pubKeyOperator.Get())) {
peerman.Misbehaving(peer.GetId(), 10);
return;
return tl::unexpected{10};
}

if (!dsq.fReady) {
Expand All @@ -167,17 +166,18 @@ void CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, PeerManager& peerman, CD
//don't allow a few nodes to dominate the queuing process
if (nLastDsq != 0 && nDsqThreshold > mmetaman->GetDsqCount()) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Masternode %s is sending too many dsq messages\n", dmn->pdmnState->addr.ToString());
return;
return {};
}
mmetaman->AllowMixing(dmn->proTxHash);

LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString());

TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;
if (!lockRecv) return {};
vecCoinJoinQueue.push_back(dsq);
dsq.Relay(connman);
}
return {};
}

void CCoinJoinServer::ProcessDSVIN(CNode& peer, CDataStream& vRecv)
Expand Down
10 changes: 6 additions & 4 deletions src/coinjoin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
#define BITCOIN_COINJOIN_SERVER_H

#include <coinjoin/coinjoin.h>
#include <net.h>

#include <net_types.h>

class CChainState;
class CCoinJoinServer;
class CDataStream;
class CNode;
class CTxMemPool;
class PeerManager;

class UniValue;

Expand Down Expand Up @@ -71,7 +73,7 @@ class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager
void RelayCompletedTransaction(PoolMessage nMessageID) LOCKS_EXCLUDED(cs_coinjoin);

void ProcessDSACCEPT(CNode& peer, CDataStream& vRecv) LOCKS_EXCLUDED(cs_vecqueue);
void ProcessDSQUEUE(const CNode& peer, PeerManager& peerman, CDataStream& vRecv) LOCKS_EXCLUDED(cs_vecqueue);
PeerMsgRet ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv) LOCKS_EXCLUDED(cs_vecqueue);
void ProcessDSVIN(CNode& peer, CDataStream& vRecv) LOCKS_EXCLUDED(cs_coinjoin);
void ProcessDSSIGNFINALTX(CDataStream& vRecv) LOCKS_EXCLUDED(cs_coinjoin);

Expand All @@ -87,7 +89,7 @@ class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager
fUnitTest(false)
{}

void ProcessMessage(CNode& pfrom, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(CNode& pfrom, std::string_view msg_type, CDataStream& vRecv);

bool HasTimedOut() const;
void CheckTimeout();
Expand Down
Loading

0 comments on commit 95fad52

Please sign in to comment.