From aad626f04d86cfaae0a3e19021478119cd840290 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 23 Apr 2024 09:52:22 +0100 Subject: [PATCH] Bucket limiting and depth parameterization. Convert vacancy_update to election_stopped observer. Reworking buckets. Fixing bucket WIP WIP Commenting removal selection Scheduler logging. Increase timestamp precision. Formatting. Removing completed elections. --- nano/core_test/active_transactions.cpp | 30 ++-- nano/core_test/blockprocessor.cpp | 117 +++++++++++++++ nano/core_test/conflicts.cpp | 4 +- nano/core_test/election.cpp | 2 +- nano/core_test/election_scheduler.cpp | 14 +- nano/core_test/ledger.cpp | 4 +- nano/core_test/network.cpp | 4 +- nano/core_test/node.cpp | 2 +- nano/core_test/scheduler_buckets.cpp | 60 ++++++-- nano/lib/logging_enums.hpp | 5 +- nano/node/CMakeLists.txt | 2 - nano/node/active_transactions.cpp | 8 +- nano/node/active_transactions.hpp | 2 +- nano/node/blockprocessor.cpp | 22 +++ nano/node/node.cpp | 8 +- nano/node/nodeconfig.hpp | 2 + nano/node/process_live_dispatcher.cpp | 4 +- nano/node/scheduler/bucket.cpp | 64 +++----- nano/node/scheduler/bucket.hpp | 51 +++++-- nano/node/scheduler/buckets.cpp | 150 ------------------- nano/node/scheduler/buckets.hpp | 60 -------- nano/node/scheduler/component.cpp | 2 - nano/node/scheduler/priority.cpp | 193 +++++++++++++------------ nano/node/scheduler/priority.hpp | 37 +++-- nano/rpc_test/rpc.cpp | 2 +- nano/secure/common.hpp | 3 +- nano/slow_test/node.cpp | 2 +- 27 files changed, 411 insertions(+), 443 deletions(-) delete mode 100644 nano/node/scheduler/buckets.cpp delete mode 100644 nano/node/scheduler/buckets.hpp diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index fa2e6d4d40..27d128b186 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -432,7 +432,7 @@ TEST (inactive_votes_cache, multiple_votes) ASSERT_TIMELY_EQ (5s, node.vote_cache.find (send1->hash ()).size (), 2); ASSERT_EQ (1, node.vote_cache.size ()); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); std::shared_ptr election; ASSERT_TIMELY (5s, election = node.active.election (send1->qualified_root ())); ASSERT_EQ (3, election->votes ().size ()); // 2 votes and 1 default not_an_acount @@ -1018,7 +1018,7 @@ TEST (active_transactions, confirmation_consistency) system.deadline_set (5s); while (!node.ledger.confirmed.block_exists_or_pruned (node.ledger.tx_begin_read (), block->hash ())) { - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_NO_ERROR (system.poll (5ms)); } ASSERT_NO_ERROR (system.poll_until_true (1s, [&node, &block, i] { @@ -1160,19 +1160,19 @@ TEST (active_transactions, activate_account_chain) ASSERT_EQ (nano::block_status::progress, node.process (open)); ASSERT_EQ (nano::block_status::progress, node.process (receive)); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, node.active.election (send->qualified_root ())); auto election1 = node.active.election (send->qualified_root ()); ASSERT_EQ (1, node.active.size ()); ASSERT_EQ (1, election1->blocks ().count (send->hash ())); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); auto election2 = node.active.election (send->qualified_root ()); ASSERT_EQ (election2, election1); election1->force_confirm (); ASSERT_TIMELY (3s, node.block_confirmed (send->hash ())); // On cementing, the next election is started ASSERT_TIMELY (3s, node.active.active (send2->qualified_root ())); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); auto election3 = node.active.election (send2->qualified_root ()); ASSERT_NE (nullptr, election3); ASSERT_EQ (1, election3->blocks ().count (send2->hash ())); @@ -1181,11 +1181,11 @@ TEST (active_transactions, activate_account_chain) // On cementing, the next election is started ASSERT_TIMELY (3s, node.active.active (open->qualified_root ())); ASSERT_TIMELY (3s, node.active.active (send3->qualified_root ())); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); auto election4 = node.active.election (send3->qualified_root ()); ASSERT_NE (nullptr, election4); ASSERT_EQ (1, election4->blocks ().count (send3->hash ())); - node.scheduler.priority.activate (key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), key.pub); auto election5 = node.active.election (open->qualified_root ()); ASSERT_NE (nullptr, election5); ASSERT_EQ (1, election5->blocks ().count (open->hash ())); @@ -1193,7 +1193,7 @@ TEST (active_transactions, activate_account_chain) ASSERT_TIMELY (3s, node.block_confirmed (open->hash ())); // Until send3 is also confirmed, the receive block should not activate std::this_thread::sleep_for (200ms); - node.scheduler.priority.activate (key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), key.pub); election4->force_confirm (); ASSERT_TIMELY (3s, node.block_confirmed (send3->hash ())); ASSERT_TIMELY (3s, node.active.active (receive->qualified_root ())); @@ -1330,13 +1330,11 @@ TEST (active_transactions, vacancy) .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); - node.active.vacancy_update = [&updated] () { updated = true; }; + node.active.election_stopped.add ([&updated] (std::shared_ptr election) { updated = true; }); ASSERT_EQ (nano::block_status::progress, node.process (send)); ASSERT_EQ (1, node.active.vacancy ()); ASSERT_EQ (0, node.active.size ()); - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); - ASSERT_TIMELY (1s, updated); - updated = false; + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_EQ (0, node.active.vacancy ()); ASSERT_EQ (1, node.active.size ()); auto election1 = node.active.election (send->qualified_root ()); @@ -1500,7 +1498,7 @@ TEST (active_transactions, limit_vote_hinted_elections) /* * Tests that when AEC is running at capacity from normal elections, it is still possible to schedule a limited number of hinted elections */ -TEST (active_transactions, allow_limited_overflow) +TEST (active_transactions, DISABLED_allow_limited_overflow) { nano::test::system system; nano::node_config config = system.default_config (); @@ -1524,7 +1522,7 @@ TEST (active_transactions, allow_limited_overflow) // Insert the first part of the blocks into normal election scheduler for (auto const & block : blocks1) { - node.scheduler.priority.activate (block->account (), node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); } // Ensure number of active elections reaches AEC limit and there is no overfill @@ -1549,7 +1547,7 @@ TEST (active_transactions, allow_limited_overflow) /* * Tests that when hinted elections are present in the AEC, normal scheduler adapts not to exceed the limit of all elections */ -TEST (active_transactions, allow_limited_overflow_adapt) +TEST (active_transactions, DISABLED_allow_limited_overflow_adapt) { nano::test::system system; nano::node_config config = system.default_config (); @@ -1586,7 +1584,7 @@ TEST (active_transactions, allow_limited_overflow_adapt) // Insert the first part of the blocks into normal election scheduler for (auto const & block : blocks1) { - node.scheduler.priority.activate (block->account (), node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); } // Ensure number of active elections reaches AEC limit and there is no overfill diff --git a/nano/core_test/blockprocessor.cpp b/nano/core_test/blockprocessor.cpp index ad7b3fffaa..526e14ace9 100644 --- a/nano/core_test/blockprocessor.cpp +++ b/nano/core_test/blockprocessor.cpp @@ -1,9 +1,12 @@ #include #include +#include +#include #include #include #include #include +#include #include #include @@ -40,3 +43,117 @@ TEST (block_processor, broadcast_block_on_arrival) // Checks whether the block was broadcast. ASSERT_TIMELY (5s, node2->block_or_pruned_exists (send1->hash ())); } + +TEST (block_processor, rollback_overflow) +{ + nano::test::system system; + nano::node_config config; + config.priority_scheduler.depth = 1; + auto node = system.add_node (config); + nano::state_block_builder builder; + nano::keypair key1; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .link (key1.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build (); + ASSERT_EQ (nano::block_status::progress, node->ledger.process (node->ledger.tx_begin_write (), send1)); + node->ledger.confirm (node->ledger.tx_begin_write (), send1->hash ()); + nano::keypair key2; + auto send2 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) + .link (key2.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send1->hash ())) + .build (); + ASSERT_EQ (nano::block_status::progress, node->ledger.process (node->ledger.tx_begin_write (), send2)); + node->ledger.confirm (node->ledger.tx_begin_write (), send2->hash ()); + + auto open1 = builder.make_block () + .account (key1.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .balance (nano::Gxrb_ratio) + .link (send1->hash ()) + .sign (key1.prv, key1.pub) + .work (*system.work.generate (key1.pub)) + .build (); + std::optional status; + ASSERT_TRUE ((status = node->block_processor.add_blocking (open1, nano::block_source::live), status && status.value () == nano::block_status::progress)); + auto open2 = builder.make_block () + .account (key2.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .balance (nano::Gxrb_ratio) + .link (send2->hash ()) + .sign (key2.prv, key2.pub) + .work (*system.work.generate (key2.pub)) + .build (); + ASSERT_TRUE ((status = node->block_processor.add_blocking (open2, nano::block_source::live), status && status.value () == nano::block_status::overflow)); +} + +TEST (block_processor, scheduler_confirmed_space) +{ + nano::test::system system; + nano::node_config config; + config.priority_scheduler.depth = 1; + auto node = system.add_node (config); + nano::state_block_builder builder; + nano::keypair key1; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .link (key1.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build (); + ASSERT_EQ (nano::block_status::progress, node->ledger.process (node->ledger.tx_begin_write (), send1)); + node->ledger.confirm (node->ledger.tx_begin_write (), send1->hash ()); + nano::keypair key2; + auto send2 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) + .link (key2.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send1->hash ())) + .build (); + ASSERT_EQ (nano::block_status::progress, node->ledger.process (node->ledger.tx_begin_write (), send2)); + node->ledger.confirm (node->ledger.tx_begin_write (), send2->hash ()); + + auto open1 = builder.make_block () + .account (key1.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .balance (nano::Gxrb_ratio) + .link (send1->hash ()) + .sign (key1.prv, key1.pub) + .work (*system.work.generate (key1.pub)) + .build (); + std::optional status; + ASSERT_TRUE ((status = node->block_processor.add_blocking (open1, nano::block_source::live), status && status.value () == nano::block_status::progress)); + auto election = node->active.election (open1->qualified_root ()); + ASSERT_NE (nullptr, election); + election->force_confirm (); + ASSERT_TIMELY (5s, node->active.empty ()); + auto open2 = builder.make_block () + .account (key2.pub) + .previous (0) + .representative (nano::dev::genesis_key.pub) + .balance (nano::Gxrb_ratio) + .link (send2->hash ()) + .sign (key2.prv, key2.pub) + .work (*system.work.generate (key2.pub)) + .build (); + ASSERT_TRUE ((status = node->block_processor.add_blocking (open2, nano::block_source::live), status && status.value () == nano::block_status::progress)); +} diff --git a/nano/core_test/conflicts.cpp b/nano/core_test/conflicts.cpp index a6eb988616..cbf1a9e866 100644 --- a/nano/core_test/conflicts.cpp +++ b/nano/core_test/conflicts.cpp @@ -31,7 +31,7 @@ TEST (conflicts, start_stop) node1.work_generate_blocking (*send1); ASSERT_EQ (nano::block_status::progress, node1.process (send1)); ASSERT_EQ (0, node1.active.size ()); - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, node1.active.election (send1->qualified_root ())); auto election1 = node1.active.election (send1->qualified_root ()); ASSERT_EQ (1, node1.active.size ()); @@ -64,7 +64,7 @@ TEST (conflicts, add_existing) ASSERT_TIMELY (5s, node1.block (send1->hash ())); // instruct the election scheduler to trigger an election for send1 - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); // wait for election to be started before processing send2 ASSERT_TIMELY (5s, node1.active.active (*send1)); diff --git a/nano/core_test/election.cpp b/nano/core_test/election.cpp index 96cee2861c..ac80c52885 100644 --- a/nano/core_test/election.cpp +++ b/nano/core_test/election.cpp @@ -151,7 +151,7 @@ TEST (election, quorum_minimum_confirm_success) .build (); node1.work_generate_blocking (*send1); node1.process_active (send1); - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, node1.active.election (send1->qualified_root ())); auto election = node1.active.election (send1->qualified_root ()); ASSERT_NE (nullptr, election); diff --git a/nano/core_test/election_scheduler.cpp b/nano/core_test/election_scheduler.cpp index 2195ffa308..ad5295e943 100644 --- a/nano/core_test/election_scheduler.cpp +++ b/nano/core_test/election_scheduler.cpp @@ -32,7 +32,7 @@ TEST (election_scheduler, activate_one_timely) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); system.nodes[0]->ledger.process (system.nodes[0]->ledger.tx_begin_write (), send1); - system.nodes[0]->scheduler.priority.activate (nano::dev::genesis_key.pub, system.nodes[0]->ledger.tx_begin_read ()); + system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ())); } @@ -50,7 +50,7 @@ TEST (election_scheduler, activate_one_flush) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); system.nodes[0]->ledger.process (system.nodes[0]->ledger.tx_begin_write (), send1); - system.nodes[0]->scheduler.priority.activate (nano::dev::genesis_key.pub, system.nodes[0]->ledger.tx_begin_read ()); + system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ())); } @@ -69,7 +69,7 @@ TEST (election_scheduler, activate_one_flush) * As soon as the test code manually confirms E1 (and thus evicts it out of the AEC), * it is expected that E2 begins and the scheduler's queue becomes empty again. */ -TEST (election_scheduler, no_vacancy) +TEST (election_scheduler, DISABLED_no_vacancy) { nano::test::system system{}; @@ -121,7 +121,7 @@ TEST (election_scheduler, no_vacancy) ASSERT_EQ (nano::block_status::progress, node.process (block1)); // There is vacancy so it should be inserted - node.scheduler.priority.activate (nano::dev::genesis_key.pub, node.ledger.tx_begin_read ()); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); std::shared_ptr election{}; ASSERT_TIMELY (5s, (election = node.active.election (block1->qualified_root ())) != nullptr); @@ -137,12 +137,12 @@ TEST (election_scheduler, no_vacancy) ASSERT_EQ (nano::block_status::progress, node.process (block2)); // There is no vacancy so it should stay queued - node.scheduler.priority.activate (key.pub, node.ledger.tx_begin_read ()); - ASSERT_TIMELY_EQ (5s, node.scheduler.priority.size (), 1); + node.scheduler.priority.activate (node.ledger.tx_begin_read (), key.pub); + // ASSERT_TIMELY_EQ (5s, node.scheduler.priority.size (), 1); ASSERT_EQ (node.active.election (block2->qualified_root ()), nullptr); // Election confirmed, next in queue should begin election->force_confirm (); ASSERT_TIMELY (5s, node.active.election (block2->qualified_root ()) != nullptr); - ASSERT_TRUE (node.scheduler.priority.empty ()); + // ASSERT_TRUE (node.scheduler.priority.empty ()); } diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index a9cec4fa16..dccd5f62d8 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -963,7 +963,7 @@ TEST (votes, check_signature) auto transaction = node1.ledger.tx_begin_write (); ASSERT_EQ (nano::block_status::progress, node1.ledger.process (transaction, send1)); } - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, node1.active.election (send1->qualified_root ())); auto election1 = node1.active.election (send1->qualified_root ()); ASSERT_EQ (1, election1->votes ().size ()); @@ -1034,7 +1034,7 @@ TEST (votes, add_existing) .build (); node1.work_generate_blocking (*send1); ASSERT_EQ (nano::block_status::progress, node1.ledger.process (node1.ledger.tx_begin_write (), send1)); - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, node1.active.election (send1->qualified_root ())); auto election1 = node1.active.election (send1->qualified_root ()); auto vote1 = nano::test::make_vote (nano::dev::genesis_key, { send1 }, nano::vote::timestamp_min * 1, 0); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 01190f95df..3a1c5660f0 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -370,7 +370,7 @@ TEST (receivable_processor, confirm_insufficient_pos) .build (); node1.work_generate_blocking (*block1); ASSERT_EQ (nano::block_status::progress, node1.process (block1)); - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); nano::keypair key1; auto vote = nano::test::make_vote (key1, { block1 }, 0, 0); nano::confirm_ack con1{ nano::dev::network_params.network, vote }; @@ -393,7 +393,7 @@ TEST (receivable_processor, confirm_sufficient_pos) .build (); node1.work_generate_blocking (*block1); ASSERT_EQ (nano::block_status::progress, node1.process (block1)); - node1.scheduler.priority.activate (nano::dev::genesis_key.pub, node1.ledger.tx_begin_read ()); + node1.scheduler.priority.activate (node1.ledger.tx_begin_read (), nano::dev::genesis_key.pub); auto vote = nano::test::make_vote (nano::dev::genesis_key, { block1 }, 0, 0); nano::confirm_ack con1{ nano::dev::network_params.network, vote }; auto channel1 = std::make_shared (node1, node1); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index b4d4b61367..7089050e5a 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1362,7 +1362,7 @@ TEST (node, rep_self_vote) ASSERT_EQ (nano::block_status::progress, node0->process (block0)); auto & active = node0->active; auto & scheduler = node0->scheduler; - scheduler.priority.activate (nano::dev::genesis_key.pub, node0->ledger.tx_begin_read ()); + scheduler.priority.activate (node0->ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, active.election (block0->qualified_root ())); auto election1 = active.election (block0->qualified_root ()); ASSERT_NE (nullptr, election1); diff --git a/nano/core_test/scheduler_buckets.cpp b/nano/core_test/scheduler_buckets.cpp index 43912e29b9..c14ae717c5 100644 --- a/nano/core_test/scheduler_buckets.cpp +++ b/nano/core_test/scheduler_buckets.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include @@ -107,15 +107,7 @@ std::shared_ptr & block3 () return result; } -TEST (buckets, construction) -{ - nano::scheduler::buckets buckets; - ASSERT_EQ (0, buckets.size ()); - ASSERT_TRUE (buckets.empty ()); - ASSERT_EQ (62, buckets.bucket_count ()); -} - -TEST (buckets, index_min) +/*TEST (buckets, index_min) { nano::scheduler::buckets buckets; ASSERT_EQ (0, buckets.index (std::numeric_limits::min ())); @@ -251,4 +243,52 @@ TEST (buckets, trim_even) ASSERT_EQ (block0 (), buckets.top ()); buckets.pop (); ASSERT_EQ (block1 (), buckets.top ()); +}*/ + +TEST (bucket, construction) +{ + nano::scheduler::bucket bucket{ 0 }; +} + +TEST (bucket, insert_zero) +{ + nano::scheduler::bucket bucket{ 0 }; + auto block = block0 (); + auto drop = bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 0 }, block); + ASSERT_EQ (drop, block); +} + +TEST (bucket, push_available) +{ + nano::scheduler::bucket bucket{ 1 }; + auto block = block0 (); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1000 }, block)); +} + +TEST (bucket, push_overflow_other) +{ + nano::scheduler::bucket bucket{ 1 }; + auto block0 = ::block0 (); + auto block1 = ::block1 (); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1000 }, block0)); + ASSERT_EQ (block0, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 900 }, block1)); +} + +TEST (bucket, push_overflow_self) +{ + nano::scheduler::bucket bucket{ 1 }; + auto block0 = ::block0 (); + auto block1 = ::block1 (); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1000 }, block0)); + ASSERT_EQ (block1, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1100 }, block1)); +} + +// Inserting duplicate block should not return an overflow or reject +TEST (bucket, accept_duplicate) +{ + nano::scheduler::bucket bucket{ 1 }; + auto block0 = ::block0 (); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1000 }, block0)); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 900 }, block0)); + ASSERT_EQ (nullptr, bucket.insert (std::chrono::steady_clock::time_point{} + std::chrono::milliseconds{ 1100 }, block0)); } diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index a3c21bfb13..7c10a084e8 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -125,6 +125,9 @@ enum class detail // election_scheduler block_activated, + block_insert, // Block inserted with no overflow + block_overflow, // Block inserted and another block was removed as an overflow + block_reject, // Block was not inserted because of an overflow // vote_generator candidate_processed, @@ -214,4 +217,4 @@ struct magic_enum::customize::enum_range { static constexpr int min = 0; static constexpr int max = 512; -}; \ No newline at end of file +}; diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index ec525aa955..81f78e70d4 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -135,8 +135,6 @@ add_library( request_aggregator.cpp scheduler/bucket.cpp scheduler/bucket.hpp - scheduler/buckets.cpp - scheduler/buckets.hpp scheduler/component.hpp scheduler/component.cpp scheduler/hinted.hpp diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 95e77f84b1..c76891c4a1 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -144,12 +144,12 @@ void nano::active_transactions::notify_observers (nano::secure::read_transaction void nano::active_transactions::activate_successors (nano::secure::read_transaction const & transaction, std::shared_ptr const & block) { - node.scheduler.priority.activate (block->account (), transaction); + node.scheduler.priority.activate (transaction, block->account ()); // Start or vote for the next unconfirmed block in the destination account if (block->is_send () && !block->destination ().is_zero () && block->destination () != block->account ()) { - node.scheduler.priority.activate (block->destination (), transaction); + node.scheduler.priority.activate (transaction, block->destination ()); } } @@ -287,7 +287,7 @@ void nano::active_transactions::cleanup_election (nano::unique_lock lock_a.unlock (); - vacancy_update (); + election_stopped.notify (election); for (auto const & [hash, block] : blocks_l) { @@ -436,7 +436,6 @@ nano::election_insertion_result nano::active_transactions::insert (std::shared_p trigger_vote_cache (hash); node.observers.active_started.notify (hash); - vacancy_update (); } // Votes are generated for inserted or ongoing elections @@ -638,7 +637,6 @@ void nano::active_transactions::clear () blocks.clear (); roots.clear (); } - vacancy_update (); } std::unique_ptr nano::collect_container_info (active_transactions & active_transactions, std::string const & name) diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 1fd12ae26f..915d9fadfd 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -183,7 +183,7 @@ class active_transactions final * How many election slots are available for specified election type */ int64_t vacancy (nano::election_behavior behavior = nano::election_behavior::normal) const; - std::function vacancy_update{ [] () {} }; + nano::observer_set> election_stopped; std::size_t election_winner_details_size (); void add_election_winner_details (nano::block_hash const &, std::shared_ptr const &); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 7fa7ac4e3f..16e9b4be42 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -372,6 +374,22 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction nano::log::arg{ "forced", forced_a }, nano::log::arg{ "block", block }); + if (result == nano::block_status::progress) + { + auto removed = node.scheduler.priority.activate (transaction_a, block->account ()); + if (removed != nullptr && node.ledger.any.block_exists (transaction_a, removed->hash ())) + { + node.ledger.rollback (transaction_a, removed->hash ()); + if (removed->hash () == block->hash ()) + { + return nano::block_status::overflow; + } + else + { + std::cerr << "replace\n"; + } + } + } switch (result) { case nano::block_status::progress: @@ -449,6 +467,10 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction { break; } + case nano::block_status::overflow: + { + break; + } } return result; } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a815235254..b68b9b4699 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -222,7 +222,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }; backlog.activate_callback.add ([this] (secure::transaction const & transaction, nano::account const & account) { - scheduler.priority.activate (account, transaction); + scheduler.priority.activate (transaction, account); scheduler.optimistic.activate (transaction, account); }); @@ -248,11 +248,11 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy if (!init_error ()) { // Notify election schedulers when AEC frees election slot - active.vacancy_update = [this] () { - scheduler.priority.notify (); + active.election_stopped.add ([this] (std::shared_ptr election) { + scheduler.priority.election_stopped (election); scheduler.hinted.notify (); scheduler.optimistic.notify (); - }; + }); wallets.observer = [this] (bool active) { observers.wallet.notify (active); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index ce744cdfc1..c46a4413c6 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -58,6 +59,7 @@ class node_config std::optional peering_port{}; nano::scheduler::optimistic_config optimistic_scheduler; nano::scheduler::hinted_config hinted_scheduler; + nano::scheduler::priority_config priority_scheduler; std::vector> work_peers; std::vector> secondary_work_peers{ { "127.0.0.1", 8076 } }; /* Default of nano-pow-server */ std::vector preconfigured_peers; diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index 6d1d66ec77..c052ae549d 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -44,10 +44,10 @@ void nano::process_live_dispatcher::inspect (nano::block_status const & result, void nano::process_live_dispatcher::process_live (nano::block const & block, secure::transaction const & transaction) { // Start collecting quorum on block - if (ledger.dependents_confirmed (transaction, block)) + /*if (ledger.dependents_confirmed (transaction, block)) { scheduler.activate (block.account (), transaction); - } + }*/ if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) { diff --git a/nano/node/scheduler/bucket.cpp b/nano/node/scheduler/bucket.cpp index 51ba1626a5..b435d5c3aa 100644 --- a/nano/node/scheduler/bucket.cpp +++ b/nano/node/scheduler/bucket.cpp @@ -1,62 +1,34 @@ #include #include -bool nano::scheduler::bucket::value_type::operator< (value_type const & other_a) const +nano::block_hash nano::scheduler::bucket::entry_t::hash () const { - return time < other_a.time || (time == other_a.time && block->hash () < other_a.block->hash ()); + return block->hash (); } -bool nano::scheduler::bucket::value_type::operator== (value_type const & other_a) const +nano::scheduler::bucket::bucket (size_t max) : + max{ max } { - return time == other_a.time && block->hash () == other_a.block->hash (); } -nano::scheduler::bucket::bucket (size_t maximum) : - maximum{ maximum } +std::shared_ptr nano::scheduler::bucket::insert (std::chrono::steady_clock::time_point time, std::shared_ptr block) { - debug_assert (maximum > 0); -} - -nano::scheduler::bucket::~bucket () -{ -} - -std::shared_ptr nano::scheduler::bucket::top () const -{ - debug_assert (!queue.empty ()); - return queue.begin ()->block; -} - -void nano::scheduler::bucket::pop () -{ - debug_assert (!queue.empty ()); - queue.erase (queue.begin ()); -} - -void nano::scheduler::bucket::push (uint64_t time, std::shared_ptr block) -{ - queue.insert ({ time, block }); - if (queue.size () > maximum) + std::lock_guard lock{ mutex }; + backlog.insert (entry_t{ time, block }); + debug_assert (backlog.size () <= max + 1); // One extra at most + if (backlog.size () <= max) { - debug_assert (!queue.empty ()); - queue.erase (--queue.end ()); + return nullptr; } + auto newest = backlog.begin (); // The first item in descending order has the highest timestamp i.e. it is the newest. + auto discard = newest->block; + backlog.erase (newest); + debug_assert (backlog.size () <= max); + return discard; } -size_t nano::scheduler::bucket::size () const -{ - return queue.size (); -} - -bool nano::scheduler::bucket::empty () const +size_t nano::scheduler::bucket::erase (nano::block_hash const & hash) { - return queue.empty (); -} - -void nano::scheduler::bucket::dump () const -{ - for (auto const & item : queue) - { - std::cerr << item.time << ' ' << item.block->hash ().to_string () << '\n'; - } + std::lock_guard lock{ mutex }; + return backlog.get ().erase (hash); } diff --git a/nano/node/scheduler/bucket.hpp b/nano/node/scheduler/bucket.hpp index 2f32c17d59..c2fade950a 100644 --- a/nano/node/scheduler/bucket.hpp +++ b/nano/node/scheduler/bucket.hpp @@ -1,13 +1,24 @@ #pragma once +#include +#include + +#include +#include +#include +#include +#include + +#include #include #include #include -#include +#include namespace nano { class block; +class logger; } namespace nano::scheduler { @@ -15,25 +26,35 @@ namespace nano::scheduler */ class bucket final { - class value_type + class tag_time + { + }; + class tag_hash + { + }; + class entry_t { public: - uint64_t time; + std::chrono::steady_clock::time_point time; std::shared_ptr block; - bool operator< (value_type const & other_a) const; - bool operator== (value_type const & other_a) const; + nano::block_hash hash () const; }; - std::set queue; - size_t const maximum; + using backlog_t = boost::multi_index_container, + boost::multi_index::member, + std::greater>, // Sorted by last-confirmed time in descending order + boost::multi_index::hashed_unique, + boost::multi_index::const_mem_fun>>>; public: - bucket (size_t maximum); - ~bucket (); - std::shared_ptr top () const; - void pop (); - void push (uint64_t time, std::shared_ptr block); - size_t size () const; - bool empty () const; - void dump () const; + bucket (size_t max); + std::shared_ptr insert (std::chrono::steady_clock::time_point time, std::shared_ptr block); + size_t erase (nano::block_hash const & hash); + +private: + backlog_t backlog; + std::mutex mutex; + size_t const max; }; } // namespace nano::scheduler diff --git a/nano/node/scheduler/buckets.cpp b/nano/node/scheduler/buckets.cpp deleted file mode 100644 index c033f6f6ca..0000000000 --- a/nano/node/scheduler/buckets.cpp +++ /dev/null @@ -1,150 +0,0 @@ -#include -#include -#include -#include - -#include - -/** Moves the bucket pointer to the next bucket */ -void nano::scheduler::buckets::next () -{ - ++current; - if (current == buckets_m.end ()) - { - current = buckets_m.begin (); - } -} - -/** Seek to the next non-empty bucket, if one exists */ -void nano::scheduler::buckets::seek () -{ - next (); - for (std::size_t i = 0, n = buckets_m.size (); (*current)->empty () && i < n; ++i) - { - next (); - } -} - -/** - * Prioritization constructor, construct a container containing approximately 'maximum' number of blocks. - * @param maximum number of blocks that this container can hold, this is a soft and approximate limit. - */ -nano::scheduler::buckets::buckets (uint64_t maximum) : - maximum{ maximum } -{ - auto build_region = [this] (uint128_t const & begin, uint128_t const & end, size_t count) { - auto width = (end - begin) / count; - for (auto i = 0; i < count; ++i) - { - minimums.push_back (begin + i * width); - } - }; - minimums.push_back (uint128_t{ 0 }); - build_region (uint128_t{ 1 } << 88, uint128_t{ 1 } << 92, 2); - build_region (uint128_t{ 1 } << 92, uint128_t{ 1 } << 96, 4); - build_region (uint128_t{ 1 } << 96, uint128_t{ 1 } << 100, 8); - build_region (uint128_t{ 1 } << 100, uint128_t{ 1 } << 104, 16); - build_region (uint128_t{ 1 } << 104, uint128_t{ 1 } << 108, 16); - build_region (uint128_t{ 1 } << 108, uint128_t{ 1 } << 112, 8); - build_region (uint128_t{ 1 } << 112, uint128_t{ 1 } << 116, 4); - build_region (uint128_t{ 1 } << 116, uint128_t{ 1 } << 120, 2); - minimums.push_back (uint128_t{ 1 } << 120); - auto bucket_max = std::max (1u, maximum / minimums.size ()); - for (size_t i = 0u, n = minimums.size (); i < n; ++i) - { - buckets_m.push_back (std::make_unique (bucket_max)); - } - current = buckets_m.begin (); -} - -nano::scheduler::buckets::~buckets () -{ -} - -std::size_t nano::scheduler::buckets::index (nano::uint128_t const & balance) const -{ - auto index = std::upper_bound (minimums.begin (), minimums.end (), balance) - minimums.begin () - 1; - return index; -} - -/** - * Push a block and its associated time into the prioritization container. - * The time is given here because sideband might not exist in the case of state blocks. - */ -void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr block, nano::amount const & priority) -{ - auto was_empty = empty (); - auto & bucket = buckets_m[index (priority.number ())]; - bucket->push (time, block); - if (was_empty) - { - seek (); - } -} - -/** Return the highest priority block of the current bucket */ -std::shared_ptr nano::scheduler::buckets::top () const -{ - debug_assert (!empty ()); - auto result = (*current)->top (); - return result; -} - -/** Pop the current block from the container and seek to the next block, if it exists */ -void nano::scheduler::buckets::pop () -{ - debug_assert (!empty ()); - auto & bucket = *current; - bucket->pop (); - seek (); -} - -/** Returns the total number of blocks in buckets */ -std::size_t nano::scheduler::buckets::size () const -{ - std::size_t result{ 0 }; - for (auto const & bucket : buckets_m) - { - result += bucket->size (); - } - return result; -} - -/** Returns number of buckets, 62 by default */ -std::size_t nano::scheduler::buckets::bucket_count () const -{ - return buckets_m.size (); -} - -/** Returns number of items in bucket with index 'index' */ -std::size_t nano::scheduler::buckets::bucket_size (std::size_t index) const -{ - return buckets_m[index]->size (); -} - -/** Returns true if all buckets are empty */ -bool nano::scheduler::buckets::empty () const -{ - return std::all_of (buckets_m.begin (), buckets_m.end (), [] (auto const & bucket) { return bucket->empty (); }); -} - -/** Print the state of the class in stderr */ -void nano::scheduler::buckets::dump () const -{ - for (auto const & bucket : buckets_m) - { - bucket->dump (); - } - std::cerr << "current: " << current - buckets_m.begin () << '\n'; -} - -std::unique_ptr nano::scheduler::buckets::collect_container_info (std::string const & name) -{ - auto composite = std::make_unique (name); - for (auto i = 0; i < buckets_m.size (); ++i) - { - auto const & bucket = buckets_m[i]; - composite->add_component (std::make_unique (container_info{ std::to_string (i), bucket->size (), 0 })); - } - return composite; -} diff --git a/nano/node/scheduler/buckets.hpp b/nano/node/scheduler/buckets.hpp deleted file mode 100644 index 967b4408f7..0000000000 --- a/nano/node/scheduler/buckets.hpp +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once -#include -#include - -#include -#include -#include -#include - -namespace nano -{ -class block; -} -namespace nano::scheduler -{ -class bucket; -/** A container for holding blocks and their arrival/creation time. - * - * The container consists of a number of buckets. Each bucket holds an ordered set of 'value_type' items. - * The buckets are accessed in a round robin fashion. The index 'current' holds the index of the bucket to access next. - * When a block is inserted, the bucket to go into is determined by the account balance and the priority inside that - * bucket is determined by its creation/arrival time. - * - * The arrival/creation time is only an approximation and it could even be wildly wrong, - * for example, in the event of bootstrapped blocks. - */ -class buckets final -{ - /** container for the buckets to be read in round robin fashion */ - std::deque> buckets_m; - - /** thresholds that define the bands for each bucket, the minimum balance an account must have to enter a bucket, - * the container writes a block to the lowest indexed bucket that has balance larger than the bucket's minimum value */ - std::deque minimums; - - /** index of bucket to read next */ - decltype (buckets_m)::const_iterator current; - - /** maximum number of blocks in whole container, each bucket's maximum is maximum / bucket_number */ - uint64_t const maximum; - - void next (); - void seek (); - -public: - buckets (uint64_t maximum = 250000u); - ~buckets (); - void push (uint64_t time, std::shared_ptr block, nano::amount const & priority); - std::shared_ptr top () const; - void pop (); - std::size_t size () const; - std::size_t bucket_count () const; - std::size_t bucket_size (std::size_t index) const; - bool empty () const; - void dump () const; - std::size_t index (nano::uint128_t const & balance) const; - - std::unique_ptr collect_container_info (std::string const &); -}; -} // namespace nano::scheduler diff --git a/nano/node/scheduler/component.cpp b/nano/node/scheduler/component.cpp index 0d44042448..57b58a3c02 100644 --- a/nano/node/scheduler/component.cpp +++ b/nano/node/scheduler/component.cpp @@ -26,7 +26,6 @@ void nano::scheduler::component::start () hinted.start (); manual.start (); optimistic.start (); - priority.start (); } void nano::scheduler::component::stop () @@ -34,7 +33,6 @@ void nano::scheduler::component::stop () hinted.stop (); manual.stop (); optimistic.stop (); - priority.stop (); } std::unique_ptr nano::scheduler::component::collect_container_info (std::string const & name) diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index d35fb7b105..a6ed8feaaf 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include #include @@ -10,131 +10,135 @@ nano::scheduler::priority::priority (nano::node & node_a, nano::stats & stats_a) : node{ node_a }, - stats{ stats_a }, - buckets{ std::make_unique () } + stats{ stats_a } { + setup_buckets (); } nano::scheduler::priority::~priority () { - // Thread must be stopped before destruction - debug_assert (!thread.joinable ()); } -void nano::scheduler::priority::start () -{ - debug_assert (!thread.joinable ()); - - thread = std::thread{ [this] () { - nano::thread_role::set (nano::thread_role::name::scheduler_priority); - run (); - } }; -} - -void nano::scheduler::priority::stop () -{ - { - nano::lock_guard lock{ mutex }; - stopped = true; - } - notify (); - nano::join_or_pass (thread); -} - -bool nano::scheduler::priority::activate (nano::account const & account, secure::transaction const & transaction) +std::shared_ptr nano::scheduler::priority::activate (secure::transaction const & transaction, nano::account const & account) { debug_assert (!account.is_zero ()); auto head = node.ledger.confirmed.account_head (transaction, account); if (node.ledger.any.account_head (transaction, account) == head) { - return false; + return nullptr; } auto block = node.ledger.any.block_get (transaction, node.ledger.any.block_successor (transaction, { head.is_zero () ? static_cast (account) : head, head }).value ()); if (!node.ledger.dependents_confirmed (transaction, *block)) { - return false; + return nullptr; } - auto const balance_priority = std::max (block->balance ().number (), node.ledger.confirmed.block_balance (transaction, head).value_or (0).number ()); - auto const time_priority = !head.is_zero () ? node.ledger.confirmed.block_get (transaction, head)->sideband ().timestamp : nano::seconds_since_epoch (); // New accounts get current timestamp i.e. lowest priority + return activate (transaction, block); +} + +std::shared_ptr nano::scheduler::priority::activate (secure::transaction const & transaction, std::shared_ptr const & block) +{ + auto account = block->account (); + auto head = node.ledger.confirmed.account_head (transaction, account); + auto const balance_priority = std::max (block->balance (), node.ledger.any.block_balance (transaction, block->previous ()).value_or (0)); + auto timestamp_calculation = [&] () { + std::chrono::milliseconds diff{ 0 }; + if (!head.is_zero ()) + { + auto timestamp = node.ledger.confirmed.block_get (transaction, head)->sideband ().timestamp; + diff = std::chrono::seconds{ nano::seconds_since_epoch () - timestamp }; + } + // Use clock with higher precision than seconds + auto time = std::chrono::steady_clock::now () - diff; // New accounts get current timestamp i.e. lowest priority + return time; + }; + auto const time_priority = timestamp_calculation (); node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::activated); node.logger.trace (nano::log::type::election_scheduler, nano::log::detail::block_activated, nano::log::arg{ "account", account.to_account () }, // TODO: Convert to lazy eval - nano::log::arg{ "block", block }, - nano::log::arg{ "time", time_priority }, + nano::log::arg{ "block", block->hash ().to_string () }, + nano::log::arg{ "time", time_priority.time_since_epoch ().count () }, nano::log::arg{ "priority", balance_priority }); - nano::lock_guard lock{ mutex }; - buckets->push (time_priority, block, balance_priority); - notify (); - - return true; // Activated -} - -void nano::scheduler::priority::notify () -{ - condition.notify_all (); -} - -std::size_t nano::scheduler::priority::size () const -{ - nano::lock_guard lock{ mutex }; - return buckets->size (); -} - -bool nano::scheduler::priority::empty_locked () const -{ - return buckets->empty (); + nano::unique_lock lock{ mutex }; + if (tracking.find (block->hash ()) != tracking.end ()) + { + return nullptr; + } + auto iter = buckets.upper_bound (balance_priority); + --iter; // Iterator points to bucket after the target priority + debug_assert (iter != buckets.end ()); + auto & bucket = *iter->second; + auto removed = bucket.insert (time_priority, block); + if (removed == nullptr) + { + node.logger.trace (nano::log::type::election_scheduler, nano::log::detail::block_insert, + nano::log::arg{ "block", block->hash ().to_string () }, + nano::log::arg{ "time", time_priority.time_since_epoch ().count () }); + // Bucket was not at full capacity + auto inserted = tracking.emplace (block->hash (), &bucket); + lock.unlock (); + node.active.insert (block); + debug_assert (inserted.second); + return nullptr; + } + else if (removed != block) + { + node.logger.trace (nano::log::type::election_scheduler, nano::log::detail::block_overflow, + nano::log::arg{ "block", block->hash ().to_string () }, + nano::log::arg{ "time", time_priority.time_since_epoch ().count () }); + // Bucket was full and another block was lowest priority + auto inserted = tracking.emplace (block->hash (), &bucket); + lock.unlock (); + node.active.erase (*removed); + node.active.insert (block); + debug_assert (inserted.second); + return removed; + } + else + { + node.logger.trace (nano::log::type::election_scheduler, nano::log::detail::block_reject, + nano::log::arg{ "block", block->hash ().to_string () }, + nano::log::arg{ "time", time_priority.time_since_epoch ().count () }); + // Bucket was full and block inserted was lowest priority + // Drop + return block; + } } -bool nano::scheduler::priority::empty () const +void nano::scheduler::priority::election_stopped (std::shared_ptr election) { nano::lock_guard lock{ mutex }; - return empty_locked (); -} - -bool nano::scheduler::priority::predicate () const -{ - return node.active.vacancy () > 0 && !buckets->empty (); + for (auto const & [hash, block] : election->blocks ()) + { + if (auto existing = tracking.find (hash); existing != tracking.end ()) + { + auto erased = existing->second->erase (hash); + debug_assert (erased == 1); + tracking.erase (existing); + } + } } -void nano::scheduler::priority::run () +void nano::scheduler::priority::setup_buckets () { - nano::unique_lock lock{ mutex }; - while (!stopped) - { - condition.wait (lock, [this] () { - return stopped || predicate (); - }); - debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds - if (!stopped) + auto build_region = [this] (uint128_t const & begin, uint128_t const & end, size_t count) { + auto width = (end - begin) / count; + for (auto i = 0; i < count; ++i) { - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - - if (predicate ()) - { - auto block = buckets->top (); - buckets->pop (); - lock.unlock (); - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); - auto result = node.active.insert (block); - if (result.inserted) - { - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); - } - if (result.election != nullptr) - { - result.election->transition_active (); - } - } - else - { - lock.unlock (); - } - notify (); - lock.lock (); + buckets.emplace (begin + i * width, std::make_unique (node.config.priority_scheduler.depth)); } - } + }; + build_region (0, uint128_t{ 1 } << 88, 1); + build_region (uint128_t{ 1 } << 88, uint128_t{ 1 } << 92, 2); + build_region (uint128_t{ 1 } << 92, uint128_t{ 1 } << 96, 4); + build_region (uint128_t{ 1 } << 96, uint128_t{ 1 } << 100, 8); + build_region (uint128_t{ 1 } << 100, uint128_t{ 1 } << 104, 16); + build_region (uint128_t{ 1 } << 104, uint128_t{ 1 } << 108, 16); + build_region (uint128_t{ 1 } << 108, uint128_t{ 1 } << 112, 8); + build_region (uint128_t{ 1 } << 112, uint128_t{ 1 } << 116, 4); + build_region (uint128_t{ 1 } << 116, uint128_t{ 1 } << 120, 2); + build_region (uint128_t{ 1 } << 120, uint128_t{ 1 } << 127, 1); } std::unique_ptr nano::scheduler::priority::collect_container_info (std::string const & name) @@ -142,6 +146,5 @@ std::unique_ptr nano::scheduler::priority::colle nano::unique_lock lock{ mutex }; auto composite = std::make_unique (name); - composite->add_component (buckets->collect_container_info ("buckets")); return composite; } diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index 31c220ed3a..b28164952f 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,7 @@ namespace nano { class block; class container_info_component; +class election; class node; class stats; } @@ -21,27 +23,35 @@ namespace nano::secure { class transaction; } +namespace nano::scheduler +{ +class bucket; +} namespace nano::scheduler { -class buckets; +class priority_config final +{ +public: + // nano::error deserialize (nano::tomlconfig & toml); + // nano::error serialize (nano::tomlconfig & toml) const; + +public: + size_t depth{ 128 }; +}; class priority final { public: priority (nano::node &, nano::stats &); ~priority (); - void start (); - void stop (); - /** * Activates the first unconfirmed block of \p account_a * @return true if account was activated */ - bool activate (nano::account const &, secure::transaction const &); - void notify (); - std::size_t size () const; - bool empty () const; + std::shared_ptr activate (secure::transaction const & transaction, nano::account const & account); + std::shared_ptr activate (secure::transaction const & transaction, std::shared_ptr const & block); + void election_stopped (std::shared_ptr election); std::unique_ptr collect_container_info (std::string const & name); @@ -50,15 +60,10 @@ class priority final nano::stats & stats; private: - void run (); - bool empty_locked () const; - bool predicate () const; - - std::unique_ptr buckets; + void setup_buckets (); - bool stopped{ false }; - nano::condition_variable condition; + std::unordered_map tracking; + std::map> buckets; mutable nano::mutex mutex; - std::thread thread; }; } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index fd832bcd84..87cf567815 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -2456,7 +2456,7 @@ TEST (rpc, account_representative_set_epoch_2_insufficient_work) ASSERT_NE (nullptr, system.upgrade_genesis_epoch (*node, nano::epoch::epoch_2)); // speed up the cementing process, otherwise the node waits for frontiers confirmation to notice the unconfirmed epoch blocks, which takes time - node->scheduler.priority.activate (nano::dev::genesis_key.pub, node->ledger.tx_begin_read ()); + node->scheduler.priority.activate (node->ledger.tx_begin_read (), nano::dev::genesis_key.pub); // wait for the epoch blocks to be cemented ASSERT_TIMELY_EQ (5s, node->ledger.confirmed.account_height (node->ledger.tx_begin_read (), nano::dev::genesis_key.pub), 3); diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 9007f7bb71..7dd3cc6eed 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -225,7 +225,8 @@ enum class block_status balance_mismatch, // Balance and amount delta don't match representative_mismatch, // Representative is changed when it is not allowed block_position, // This block cannot follow the previous block - insufficient_work // Insufficient work for this block, even though it passed the minimal validation + insufficient_work, // Insufficient work for this block, even though it passed the minimal validation + overflow // Insufficient space in scheduler }; std::string_view to_string (block_status); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index dccc4c4a16..a54bf43a4b 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -2167,7 +2167,7 @@ TEST (system, block_sequence) std::string message; for (auto i : system.nodes) { - message += boost::str (boost::format ("N:%1% b:%2% c:%3% a:%4% s:%5% p:%6%\n") % std::to_string (i->network.port) % std::to_string (i->ledger.block_count ()) % std::to_string (i->ledger.cemented_count ()) % std::to_string (i->active.size ()) % std::to_string (i->scheduler.priority.size ()) % std::to_string (i->network.size ())); + message += boost::str (boost::format ("N:%1% b:%2% c:%3% a:%4% p:%5%\n") % std::to_string (i->network.port) % std::to_string (i->ledger.block_count ()) % std::to_string (i->ledger.cemented_count ()) % std::to_string (i->active.size ()) % std::to_string (i->network.size ())); nano::lock_guard lock{ i->active.mutex }; for (auto const & j : i->active.roots) {