diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index fba807e32..0c3812681 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -176,6 +176,26 @@ jobs: - name: run run: ./zig-out/bin/fuzz allocators 19 10000 + ledger_fuzz: + strategy: + matrix: + os: [ubuntu-latest] + runs-on: ${{matrix.os}} + timeout-minutes: 60 + steps: + - name: checkout + uses: actions/checkout@v2 + with: + submodules: recursive + - name: setup zig + uses: mlugg/setup-zig@v1 + with: + version: 0.13.0 + - name: build + run: zig build -Doptimize=ReleaseSafe -Dno-run fuzz + - name: run + run: ./zig-out/bin/fuzz ledger 19 10000 + # benchmarks: # if: ${{ github.ref != 'refs/heads/main' }} # strategy: diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index 9f7dae3eb..272622617 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -31,6 +31,23 @@ There are two main guidelines to keep in mind when naming interfaces and interfa - As an example, a generic implementation of `std.Random` which is a pseudo-random number generator should be named `prng` (ie this is relevant when making appropriate use of `std.Random.DefaultPrng`). - As another example, an instance of `std.heap.GeneralPurposeAllocator(config)` should be called `gpa_state`, `std.heap.ArenaAllocator` `arena_state`, and so on. +#### Method Parameters +The first parameter of a method should be named `self`. The type should be the name of the struct. +For example: + +```zig +const MyStruct = struct { + state: u8, + + fn write(self: *MyStruct, new_state: u8) void { + self.state = new_state; + } +}; +``` + +If the type name is not available (for example in anonymous structs), define `const Self = @This()` +and use that as the type. + ### Files as Structs We prohibit usage of files as instantiable struct types in the codebase. diff --git a/scripts/benchmark_server.py b/scripts/benchmark_server.py index a371206d3..4bd7ccecd 100644 --- a/scripts/benchmark_server.py +++ b/scripts/benchmark_server.py @@ -1,3 +1,7 @@ +# doc: +# this script scrapes the results/metrics directory for json files +# and generates a graph for each metric in the json file + import os import json import plotly.express as px diff --git a/scripts/collect_benchmarks.sh b/scripts/collect_benchmarks.sh index d30f9278d..9a46d4b3d 100644 --- a/scripts/collect_benchmarks.sh +++ b/scripts/collect_benchmarks.sh @@ -1,7 +1,13 @@ #!/usr/bin/env bash -# crontab -e -# 0 5 * * * bash /home/ubuntu/benchmarks/sig/scripts/collect_benchmarks.sh +# doc: +# this script will pull the latest change of the local repo +# and run the benchmark to collect metrics which are +# saved as results/output.json file. they are then +# moved to results/metrics/output-{commit}-{timestamp}.json +# +# these output files are then compared/visualized using the +# scripts/benchmark_server.py script # now in the scripts/ dir cd "$(dirname "$0")" @@ -22,7 +28,8 @@ if ls $result_file 1> /dev/null 2>&1; then echo "Results for commit $git_commit already exist. Skipping benchmark." else # Run the benchmark only if the result file doesn't exist - zig build -Doptimize=ReleaseSafe benchmark -- --metrics all + zig build -Doptimize=ReleaseSafe -Dno-run benchmark + ./zig-out/bin/benchmark --metrics -e -f all mv results/output.json "${result_dir}/output-${git_commit}-${timestamp}.json" echo "Benchmark results saved to ${result_dir}/output-${git_commit}-${timestamp}.json" diff --git a/scripts/cron_jobs/setup_benchmarks.sh b/scripts/cron_jobs/setup_benchmarks.sh new file mode 100644 index 000000000..d0a4dda6f --- /dev/null +++ b/scripts/cron_jobs/setup_benchmarks.sh @@ -0,0 +1,13 @@ +# doc: +# this script will modify your server's crontab +# to run the collect_benchmarks.sh script at 6am everyday + +SCRIPT_DIR=$(dirname "$(readlink -f "$0")")/.. + +# 6am everyday +(crontab -l; echo "\ +0 6 * * * . $HOME/.bashrc; (bash $SCRIPT_DIR/collect_benchmarks.sh) 2>&1 | logger -t sig_bench \ +") | crontab + +echo "Cron job added. Current crontab:" +crontab -l diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 2bb21669d..9d36754b0 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -291,9 +291,8 @@ pub const AccountsDB = struct { var timer = try sig.time.Timer.start(); var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{}); defer fastload_dir.close(); - self.logger.info().log("fast loading accountsdb..."); try self.fastload(fastload_dir, collapsed_manifest.accounts_db_fields); - self.logger.info().logf("loaded from snapshot in {s}", .{timer.read()}); + self.logger.info().logf("fastload: total time: {s}", .{timer.read()}); } else { const load_duration = try self.loadFromSnapshot( collapsed_manifest.accounts_db_fields, @@ -301,15 +300,14 @@ pub const AccountsDB = struct { allocator, accounts_per_file_estimate, ); - self.logger.info().logf("loaded from snapshot in {s}", .{load_duration}); + self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_duration}); } // no need to re-save if we just loaded from a fastload - if (!should_fastload and save_index) { - var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{}); - defer fastload_dir.close(); - - try self.account_index.saveToDisk(fastload_dir); + if (save_index and !should_fastload) { + var timer = try sig.time.Timer.start(); + _ = try self.saveStateForFastload(); + self.logger.info().logf("saveStateForFastload: total time: {s}", .{timer.read()}); } if (validate) { @@ -331,17 +329,28 @@ pub const AccountsDB = struct { .capitalization = inc_persistence.incremental_capitalization, } else null, }); - self.logger.info().logf("validated from snapshot in {s}", .{validate_timer.read()}); + self.logger.info().logf("validateLoadFromSnapshot: total time: {s}", .{validate_timer.read()}); } return collapsed_manifest; } + pub fn saveStateForFastload( + self: *Self, + ) !void { + self.logger.info().log("running saveStateForFastload..."); + var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{}); + defer fastload_dir.close(); + try self.account_index.saveToDisk(fastload_dir); + } + pub fn fastload( self: *Self, dir: std.fs.Dir, snapshot_manifest: AccountsDbFields, ) !void { + self.logger.info().log("running fastload..."); + var accounts_dir = try self.snapshot_dir.openDir("accounts", .{}); defer accounts_dir.close(); @@ -383,7 +392,6 @@ pub const AccountsDB = struct { } // NOTE: index loading was the most expensive part which we fastload here - self.logger.info().log("loading account index"); try self.account_index.loadFromDisk(dir); } @@ -397,7 +405,7 @@ pub const AccountsDB = struct { per_thread_allocator: std.mem.Allocator, accounts_per_file_estimate: u64, ) !sig.time.Duration { - self.logger.info().log("loading from snapshot..."); + self.logger.info().log("running loadFromSnapshot..."); // used to read account files const n_parse_threads = n_threads; @@ -472,12 +480,11 @@ pub const AccountsDB = struct { try geyser_writer.writePayloadToPipe(end_of_snapshot); } - self.logger.info().logf("[{d} threads]: merging thread indexes...", .{n_combine_threads}); var merge_timer = try sig.time.Timer.start(); try self.mergeMultipleDBs(loading_threads, n_combine_threads); - self.logger.debug().logf("merging thread indexes took: {}", .{merge_timer.read()}); + self.logger.debug().logf("mergeMultipleDBs: total time: {}", .{merge_timer.read()}); - self.logger.debug().logf("total time: {s}", .{timer.read()}); + self.logger.debug().logf("loadFromSnapshot: total time: {s}", .{timer.read()}); return timer.read(); } @@ -768,6 +775,8 @@ pub const AccountsDB = struct { thread_dbs: []AccountsDB, n_threads: usize, ) !void { + self.logger.info().logf("[{d} threads]: running mergeMultipleDBs...", .{n_threads}); + var merge_indexes_wg: std.Thread.WaitGroup = .{}; defer merge_indexes_wg.wait(); try spawnThreadTasks(mergeThreadIndexesMultiThread, .{ @@ -928,8 +937,8 @@ pub const AccountsDB = struct { ) !struct { Hash, u64 } { var timer = try sig.time.Timer.start(); // TODO: make cli arg - const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount())) * 2; - // const n_threads = 1; + const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount())); + // const n_threads = 4; // alloc the result const hashes = try self.allocator.alloc(std.ArrayListUnmanaged(Hash), n_threads); @@ -944,7 +953,10 @@ pub const AccountsDB = struct { @memset(lamports, 0); // split processing the bins over muliple threads - self.logger.info().logf("collecting hashes from accounts...", .{}); + self.logger.info().logf( + "collecting hashes from accounts using {} threads...", + .{n_threads}, + ); if (n_threads == 1) { try getHashesFromIndex( self, @@ -1044,17 +1056,17 @@ pub const AccountsDB = struct { if (params.expected_full.accounts_hash.order(&accounts_hash) != .eq) { self.logger.err().logf( - \\ incorrect accounts hash - \\ expected vs calculated: {d} vs {d} - , .{ params.expected_full.accounts_hash, accounts_hash }); + "incorrect accounts hash: expected vs calculated: {d} vs {d}", + .{ params.expected_full.accounts_hash, accounts_hash }, + ); return error.IncorrectAccountsHash; } if (params.expected_full.capitalization != total_lamports) { self.logger.err().logf( - \\ incorrect total lamports - \\ expected vs calculated: {d} vs {d} - , .{ params.expected_full.capitalization, total_lamports }); + "incorrect total lamports: expected vs calculated: {d} vs {d}", + .{ params.expected_full.capitalization, total_lamports }, + ); return error.IncorrectTotalLamports; } @@ -1093,17 +1105,17 @@ pub const AccountsDB = struct { if (expected_incremental.capitalization != incremental_lamports) { self.logger.err().logf( - \\ incorrect incremental lamports - \\ expected vs calculated: {d} vs {d} - , .{ expected_incremental.capitalization, incremental_lamports }); + "incorrect incremental lamports: expected vs calculated: {d} vs {d}", + .{ expected_incremental.capitalization, incremental_lamports }, + ); return error.IncorrectIncrementalLamports; } if (expected_incremental.accounts_hash.order(&accounts_delta_hash) != .eq) { self.logger.err().logf( - \\ incorrect accounts delta hash - \\ expected vs calculated: {d} vs {d} - , .{ expected_incremental.accounts_hash, accounts_delta_hash }); + "incorrect accounts delta hash: expected vs calculated: {d} vs {d}", + .{ expected_incremental.accounts_hash, accounts_delta_hash }, + ); return error.IncorrectAccountsDeltaHash; } @@ -3199,6 +3211,15 @@ pub fn indexAndValidateAccountFile( accounts_file.number_of_accounts = number_of_accounts; } +pub fn getAccountPerFileEstimateFromCluster( + cluster: sig.core.Cluster, +) error{NotImplementedYet}!u64 { + return switch (cluster) { + .testnet => 1_000, + else => error.NotImplementedYet, + }; +} + /// All entries in `manifest.accounts_db_fields.file_map` must correspond to an entry in `file_map`, /// with the association defined by the file id (a field of the value of the former, the key of the latter). pub fn writeSnapshotTarWithFields( @@ -4390,15 +4411,6 @@ test "generate snapshot & update gossip snapshot hashes" { } } -pub fn getAccountPerFileEstimateFromCluster( - cluster: sig.core.Cluster, -) error{NotImplementedYet}!u64 { - return switch (cluster) { - .testnet => 500, - else => error.NotImplementedYet, - }; -} - pub const BenchmarkAccountsDBSnapshotLoad = struct { pub const min_iterations = 1; pub const max_iterations = 1; @@ -4425,13 +4437,18 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct { pub fn loadAndVerifySnapshot(units: BenchTimeUnit, bench_args: BenchArgs) !struct { load_time: u64, validate_time: u64, + fastload_save_time: u64, + fastload_time: u64, } { const allocator = std.heap.c_allocator; var print_logger = sig.trace.DirectPrintLogger.init(allocator, .debug); const logger = print_logger.logger(); // unpack the snapshot - var snapshot_dir = std.fs.cwd().openDir(SNAPSHOT_DIR_PATH, .{ .iterate = true }) catch { + var snapshot_dir = std.fs.cwd().openDir( + SNAPSHOT_DIR_PATH, + .{ .iterate = true }, + ) catch { // not snapshot -> early exit std.debug.print( "need to setup a snapshot in {s} for this benchmark...\n", @@ -4441,6 +4458,8 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct { return .{ .load_time = zero_duration.asNanos(), .validate_time = zero_duration.asNanos(), + .fastload_save_time = zero_duration.asNanos(), + .fastload_time = zero_duration.asNanos(), }; }; defer snapshot_dir.close(); @@ -4455,43 +4474,76 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct { defer full_inc_manifest.deinit(allocator); const collapsed_manifest = try full_inc_manifest.collapse(allocator); - var accounts_db = try AccountsDB.init(.{ - .allocator = allocator, - .logger = logger, - .snapshot_dir = snapshot_dir, - .geyser_writer = null, - .gossip_view = null, - .index_allocation = if (bench_args.use_disk) .disk else .ram, - .number_of_index_shards = 32, - .lru_size = null, - }); - defer accounts_db.deinit(); + const loading_duration, const fastload_save_duration, const validate_duration = duration_blk: { + var accounts_db = try AccountsDB.init(.{ + .allocator = allocator, + .logger = logger, + .snapshot_dir = snapshot_dir, + .geyser_writer = null, + .gossip_view = null, + .index_allocation = if (bench_args.use_disk) .disk else .ram, + .number_of_index_shards = 32, + .lru_size = null, + }); + defer accounts_db.deinit(); - const loading_duration = try accounts_db.loadFromSnapshot( - collapsed_manifest.accounts_db_fields, - bench_args.n_threads, - allocator, - try getAccountPerFileEstimateFromCluster(bench_args.cluster), - ); + const loading_duration = try accounts_db.loadFromSnapshot( + collapsed_manifest.accounts_db_fields, + bench_args.n_threads, + allocator, + try getAccountPerFileEstimateFromCluster(bench_args.cluster), + ); - const full_snapshot = full_inc_manifest.full; - var validate_timer = try sig.time.Timer.start(); - try accounts_db.validateLoadFromSnapshot(.{ - .full_slot = full_snapshot.bank_fields.slot, - .expected_full = .{ - .accounts_hash = collapsed_manifest.accounts_db_fields.bank_hash_info.accounts_hash, - .capitalization = full_snapshot.bank_fields.capitalization, - }, - .expected_incremental = if (collapsed_manifest.bank_extra.snapshot_persistence) |inc_persistence| .{ - .accounts_hash = inc_persistence.incremental_hash, - .capitalization = inc_persistence.incremental_capitalization, - } else null, - }); - const validate_duration = validate_timer.read(); + const fastload_save_duration = blk: { + var timer = try sig.time.Timer.start(); + try accounts_db.saveStateForFastload(); + break :blk timer.read(); + }; + + const full_snapshot = full_inc_manifest.full; + var validate_timer = try sig.time.Timer.start(); + try accounts_db.validateLoadFromSnapshot(.{ + .full_slot = full_snapshot.bank_fields.slot, + .expected_full = .{ + .accounts_hash = collapsed_manifest.accounts_db_fields.bank_hash_info.accounts_hash, + .capitalization = full_snapshot.bank_fields.capitalization, + }, + .expected_incremental = if (collapsed_manifest.bank_extra.snapshot_persistence) |inc_persistence| .{ + .accounts_hash = inc_persistence.incremental_hash, + .capitalization = inc_persistence.incremental_capitalization, + } else null, + }); + const validate_duration = validate_timer.read(); + + break :duration_blk .{ loading_duration, fastload_save_duration, validate_duration }; + }; + + const fastload_duration = blk: { + var fastload_accounts_db = try AccountsDB.init(.{ + .allocator = allocator, + .logger = logger, + .snapshot_dir = snapshot_dir, + .geyser_writer = null, + .gossip_view = null, + .index_allocation = if (bench_args.use_disk) .disk else .ram, + .number_of_index_shards = 32, + .lru_size = null, + }); + defer fastload_accounts_db.deinit(); + + var fastload_dir = try snapshot_dir.makeOpenPath("fastload_state", .{}); + defer fastload_dir.close(); + + var fastload_timer = try sig.time.Timer.start(); + try fastload_accounts_db.fastload(fastload_dir, collapsed_manifest.accounts_db_fields); + break :blk fastload_timer.read(); + }; return .{ .load_time = units.convertDuration(loading_duration), .validate_time = units.convertDuration(validate_duration), + .fastload_save_time = units.convertDuration(fastload_save_duration), + .fastload_time = units.convertDuration(fastload_duration), }; } }; diff --git a/src/accountsdb/download.zig b/src/accountsdb/download.zig index 4d256f3cd..24a848449 100644 --- a/src/accountsdb/download.zig +++ b/src/accountsdb/download.zig @@ -18,7 +18,8 @@ const FullAndIncrementalManifest = sig.accounts_db.FullAndIncrementalManifest; const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall; -const DOWNLOAD_PROGRESS_UPDATES_NS = 6 * std.time.ns_per_s; +// NOTE: this also represents the interval at which progress updates are issued +const DOWNLOAD_WARMUP_TIME = sig.time.Duration.fromSecs(20); const BYTE_PER_KIB = 1024; const BYTE_PER_MIB = 1024 * BYTE_PER_KIB; @@ -177,6 +178,7 @@ pub fn downloadSnapshotsFromGossip( gossip_service: *GossipService, output_dir: std.fs.Dir, min_mb_per_sec: usize, + max_number_of_download_attempts: u64, ) !struct { std.fs.File, ?std.fs.File } { const logger = logger_.withScope(LOG_SCOPE); logger @@ -194,9 +196,15 @@ pub fn downloadSnapshotsFromGossip( var slow_peer_pubkeys = std.ArrayList(Pubkey).init(allocator); defer slow_peer_pubkeys.deinit(); + var download_attempts: u64 = 0; while (true) { std.time.sleep(5 * std.time.ns_per_s); // wait while gossip table updates + if (download_attempts > max_number_of_download_attempts) { + logger.err().logf("exceeded max download attempts: {d}", .{max_number_of_download_attempts}); + return error.UnableToDownloadSnapshot; + } + // only hold gossip table lock for this block { const gossip_table, var gossip_table_lg = gossip_service.gossip_table_rw.readWithLock(); @@ -259,6 +267,8 @@ pub fn downloadSnapshotsFromGossip( "downloading full_snapshot from: {s}", .{snapshot_url.constSlice()}, ); + + defer download_attempts += 1; const full_archive_file = downloadFile( allocator, logger, @@ -270,7 +280,6 @@ pub fn downloadSnapshotsFromGossip( ) catch |err| { switch (err) { error.TooSlow => { - logger.info().logf("peer is too slow, skipping", .{}); try slow_peer_pubkeys.append(peer.contact_info.pubkey); }, else => logger.info().logf( @@ -387,7 +396,7 @@ fn downloadFile( const elapsed_since_start = full_timer.read(); const elapsed_since_prev_lap = lap_timer.read(); - if (elapsed_since_prev_lap.asNanos() <= DOWNLOAD_PROGRESS_UPDATES_NS) continue; + if (elapsed_since_prev_lap.asNanos() <= DOWNLOAD_WARMUP_TIME.asNanos()) continue; defer lap_timer.reset(); // reset at the end of the iteration, after the update, right before the next read & write. const total_bytes_left = download_size - total_bytes_read; @@ -422,12 +431,13 @@ fn downloadFile( return output_file; } +const default_adb_config = sig.cmd.config.AccountsDBConfig{}; + pub fn getOrDownloadAndUnpackSnapshot( allocator: std.mem.Allocator, logger_: Logger, - validator_dir: std.fs.Dir, /// dir which stores the snapshot files to unpack into {validator_dir}/accounts_db - maybe_snapshot_dir: ?std.fs.Dir, + snapshot_path: []const u8, options: struct { /// gossip service is not needed when loading from an existing snapshot. /// but when we need to download a new snapshot (force_new_snapshot_download flag), @@ -438,6 +448,7 @@ pub fn getOrDownloadAndUnpackSnapshot( num_threads_snapshot_unpack: u16 = 0, min_snapshot_download_speed_mbs: usize = 20, trusted_validators: ?[]const Pubkey = null, + max_number_of_download_attempts: u64 = default_adb_config.max_number_of_snapshot_download_attempts, }, ) !struct { FullAndIncrementalManifest, SnapshotFiles } { const logger = logger_.withScope(LOG_SCOPE); @@ -451,31 +462,29 @@ pub fn getOrDownloadAndUnpackSnapshot( } // check if we need to download a fresh snapshot - var accounts_db_exists = blk: { - if (validator_dir.openDir(sig.ACCOUNTS_DB_SUBDIR, .{ .iterate = true })) |dir| { - std.posix.close(dir.fd); - break :blk true; - } else |_| { - break :blk false; + var should_delete_dir = false; + if (std.fs.cwd().openDir(snapshot_path, .{ .iterate = true })) |dir| { + defer std.posix.close(dir.fd); + if (force_new_snapshot_download) { + // clear old snapshots, if we will download a new one + should_delete_dir = true; } - }; + } else |_| {} - // clear old snapshots, if we will download a new one - if (force_new_snapshot_download and accounts_db_exists) { - logger.info().log("deleting accounts_db dir..."); - try validator_dir.deleteTreeMinStackSize("accounts_db"); - accounts_db_exists = false; + if (should_delete_dir) { + logger.info().log("deleting snapshot dir..."); + std.fs.cwd().deleteTree(snapshot_path) catch |err| { + logger.warn().logf("failed to delete snapshot directory: {}", .{err}); + }; } - const accounts_db_dir = try validator_dir.makeOpenPath(sig.ACCOUNTS_DB_SUBDIR, .{ + const snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_path, .{ .iterate = true, }); - const snapshot_dir = maybe_snapshot_dir orelse accounts_db_dir; // download a new snapshot if required const snapshot_exists = blk: { - _ = SnapshotFiles.find(allocator, snapshot_dir) catch |err| { - std.debug.print("failed to find snapshot files: {}\n", .{err}); + _ = SnapshotFiles.find(allocator, snapshot_dir) catch { break :blk false; }; break :blk true; @@ -494,6 +503,7 @@ pub fn getOrDownloadAndUnpackSnapshot( gossip_service, snapshot_dir, @intCast(min_mb_per_sec), + options.max_number_of_download_attempts, ); defer full.close(); defer if (maybe_inc) |inc| inc.close(); @@ -506,7 +516,7 @@ pub fn getOrDownloadAndUnpackSnapshot( // do a quick sanity check on the number of files in accounts/ // NOTE: this is sometimes the case that you unpacked only a portion // of the snapshot - var accounts_dir = accounts_db_dir.openDir("accounts", .{}) catch |err| switch (err) { + var accounts_dir = snapshot_dir.openDir("accounts", .{}) catch |err| switch (err) { // accounts folder doesnt exist, so its invalid error.FileNotFound => break :blk false, else => return err, @@ -557,7 +567,7 @@ pub fn getOrDownloadAndUnpackSnapshot( allocator, logger.unscoped(), archive_file, - accounts_db_dir, + snapshot_dir, n_threads_snapshot_unpack, true, ); @@ -576,7 +586,7 @@ pub fn getOrDownloadAndUnpackSnapshot( allocator, logger.unscoped(), archive_file, - accounts_db_dir, + snapshot_dir, n_threads_snapshot_unpack, false, ); @@ -592,7 +602,7 @@ pub fn getOrDownloadAndUnpackSnapshot( const snapshot_fields = try FullAndIncrementalManifest.fromFiles( allocator, logger.unscoped(), - accounts_db_dir, + snapshot_dir, snapshot_files, ); logger.info().logf("read snapshot metdata in {s}", .{std.fmt.fmtDuration(timer.read())}); diff --git a/src/accountsdb/index.zig b/src/accountsdb/index.zig index d4d68d3cd..7be4706b1 100644 --- a/src/accountsdb/index.zig +++ b/src/accountsdb/index.zig @@ -311,7 +311,7 @@ pub const AccountIndex = struct { // manager must be empty std.debug.assert(self.reference_manager.capacity == 0); - self.logger.info().log("loading state from disk..."); + self.logger.info().log("running account_index.loadFromDisk"); const reference_file = try dir.openFile("index.bin", .{}); const size = (try reference_file.stat()).size; const index_memory = try std.posix.mmap( diff --git a/src/benchmarks.zig b/src/benchmarks.zig index 0d083ba0f..b18e989e4 100644 --- a/src/benchmarks.zig +++ b/src/benchmarks.zig @@ -107,7 +107,6 @@ pub fn main() !void { continue; } else if (std.mem.startsWith(u8, arg, "-e")) { run_expensive_benchmarks = true; - collect_metrics = true; // by default collect metrics when running expensive benchmarks continue; } else if (std.mem.startsWith(u8, arg, "-f")) { force_fresh_state = true; @@ -130,6 +129,17 @@ pub fn main() !void { } else { logger.info().logf("running benchmark with filter: {s}", .{@tagName(filter)}); } + if (collect_metrics) { + logger.info().log("collecting metrics"); + } + if (run_expensive_benchmarks) { + logger.info().log("running expensive benchmarks"); + } + if (force_fresh_state) { + logger.info().log("forcing fresh state for expensive benchmarks"); + } else { + logger.info().log("re-using state for expensive benchmarks"); + } const max_time_per_bench = Duration.fromSecs(5); // !! const run_all_benchmarks = filter == .all; @@ -179,7 +189,7 @@ pub fn main() !void { logger.warn().log("[accounts_db_snapshot]: skipping benchmark, use -e to run"); } - if ((filter == .accounts_db_snapshot or run_all) and run_expensive_benchmarks) { + if ((filter == .accounts_db_snapshot or run_all) and run_expensive_benchmarks) snapshot_benchmark: { // NOTE: snapshot must exist in this directory for the benchmark to run // NOTE: also need to increase file limits to run this benchmark (see debugging.md) const BENCH_SNAPSHOT_DIR_PATH = @import("accountsdb/db.zig") @@ -197,12 +207,13 @@ pub fn main() !void { if (download_new_snapshot) { // delete existing snapshot dir if (test_snapshot_exists) { - std.debug.print("deleting snapshot dir...\n", .{}); + logger.info().log("deleting snapshot dir..."); std.fs.cwd().deleteTreeMinStackSize(BENCH_SNAPSHOT_DIR_PATH) catch |err| { - std.debug.print("failed to delete snapshot dir ('{s}'): {}\n", .{ + logger.err().logf("failed to delete snapshot dir ('{s}'): {}", .{ BENCH_SNAPSHOT_DIR_PATH, err, }); + return err; }; } @@ -227,16 +238,25 @@ pub fn main() !void { try gossip_service.start(.{}); // download and unpack snapshot - var snapshot_manifests, _ = try sig.accounts_db.download.getOrDownloadAndUnpackSnapshot( + var snapshot_manifests, _ = sig.accounts_db.download.getOrDownloadAndUnpackSnapshot( allocator, logger, - snapshot_dir, - null, + BENCH_SNAPSHOT_DIR_PATH, .{ .gossip_service = gossip_service, .force_new_snapshot_download = true, + .max_number_of_download_attempts = 50, + .min_snapshot_download_speed_mbs = 10, }, - ); + ) catch |err| { + switch (err) { + error.UnableToDownloadSnapshot => { + logger.err().log("unable to download snapshot, skipping benchmark..."); + break :snapshot_benchmark; + }, + else => return err, + } + }; defer snapshot_manifests.deinit(allocator); } diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index d0559ca26..78b6a23bf 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -307,11 +307,7 @@ pub fn run() !void { var accounts_per_file_estimate = cli.Option{ .long_name = "accounts-per-file-estimate", .short_alias = 'a', - .help = - \\number of accounts to estimate inside of account files (used for pre-allocation). - \\Safer to set it larger than smaller. - \\(approx values we found work well testnet/devnet: 1_500, mainnet: 3_000)" - , + .help = "number of accounts to estimate inside of account files (used for pre-allocation)", .value_ref = cli.mkRef(&config.current.accounts_db.accounts_per_file_estimate), .required = false, .value_name = "accounts_per_file_estimate", @@ -1394,21 +1390,18 @@ fn loadSnapshot( ) !LoadedSnapshot { const logger = unscoped_logger.withScope(@typeName(@This()) ++ "." ++ @src().fn_name); - var validator_dir = try std.fs.cwd().openDir(sig.VALIDATOR_DIR, .{}); + var validator_dir = try std.fs.cwd().makeOpenPath(sig.VALIDATOR_DIR, .{}); defer validator_dir.close(); const genesis_file_path = try config.current.genesisFilePath() orelse return error.GenesisPathNotProvided; const snapshot_dir_str = config.current.accounts_db.snapshot_dir; - var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_dir_str, .{ .iterate = true }); - defer snapshot_dir.close(); const combined_manifest, const snapshot_files = try sig.accounts_db.download.getOrDownloadAndUnpackSnapshot( allocator, logger.unscoped(), - validator_dir, - snapshot_dir, + snapshot_dir_str, .{ .gossip_service = options.gossip_service, .force_unpack_snapshot = config.current.accounts_db.force_unpack_snapshot, @@ -1418,6 +1411,9 @@ fn loadSnapshot( }, ); + var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_dir_str, .{ .iterate = true }); + defer snapshot_dir.close(); + logger.info().logf("full snapshot: {s}", .{ sig.utils.fmt.tryRealPath(snapshot_dir, snapshot_files.full.snapshotArchiveName().constSlice()), }); @@ -1555,6 +1551,7 @@ fn downloadSnapshot() !void { gossip_service, snapshot_dir, @intCast(min_mb_per_sec), + config.current.accounts_db.max_number_of_snapshot_download_attempts, ); defer full_file.close(); defer if (maybe_inc_file) |inc_file| inc_file.close(); diff --git a/src/cmd/config.zig b/src/cmd/config.zig index 966ea770e..19f4a30bf 100644 --- a/src/cmd/config.zig +++ b/src/cmd/config.zig @@ -155,6 +155,8 @@ pub const AccountsDBConfig = struct { save_index: bool = false, /// only load snapshot metadata when starting up snapshot_metadata_only: bool = false, + /// maximum number of snapshot download attempts before failing + max_number_of_snapshot_download_attempts: u64 = 1_000, }; pub const GeyserConfig = struct { diff --git a/src/gossip/helpers.zig b/src/gossip/helpers.zig index 395be5c97..dcd85e39b 100644 --- a/src/gossip/helpers.zig +++ b/src/gossip/helpers.zig @@ -31,7 +31,6 @@ pub fn initGossipFromCluster( const socket_addr = try resolveSocketAddr(allocator, entrypoint_str); try entrypoints.append(socket_addr); } - logger.info().logf("using predefined entrypoints: {any}", .{entrypoints}); // create contact info const echo_data = try getShredAndIPFromEchoServer( @@ -40,20 +39,27 @@ pub fn initGossipFromCluster( entrypoints.items, ); const my_shred_version = echo_data.shred_version orelse 0; - logger.info().logf("my shred version: {d}", .{my_shred_version}); const my_ip = echo_data.ip orelse IpAddr.newIpv4(127, 0, 0, 1); - logger.info().logf("my ip: {any}", .{my_ip}); const default_config = sig.cmd.config.GossipConfig{}; - const my_port = default_config.port; // default port + // NOTE: we dont use the default port to avoid port collisions with other gossip + // services running on the same machine + const my_port = default_config.port + 5; const my_keypair = try getOrInitIdentity(allocator, logger); - logger.info().logf("gossip_port: {d}", .{my_port}); const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key); var contact_info = ContactInfo.init(allocator, my_pubkey, getWallclockMs(), 0); try contact_info.setSocket(.gossip, SocketAddr.init(my_ip, my_port)); contact_info.shred_version = my_shred_version; + logger.info() + .field("my_pubkey", my_pubkey) + .field("my_ip", my_ip) + .field("my_shred_version", my_shred_version) + .field("gossip_port", my_port) + .field("entrypoints", entrypoints.items) + .log("setting up gossip"); + // create gossip return try GossipService.create( allocator, diff --git a/src/gossip/lib.zig b/src/gossip/lib.zig index adafbd866..e95a32789 100644 --- a/src/gossip/lib.zig +++ b/src/gossip/lib.zig @@ -11,6 +11,7 @@ pub const table = @import("table.zig"); pub const fuzz_service = @import("fuzz_service.zig"); pub const fuzz_table = @import("fuzz_table.zig"); pub const helpers = @import("helpers.zig"); +pub const prune = @import("prune.zig"); pub const ContactInfo = data.ContactInfo; pub const GossipService = service.GossipService; @@ -21,5 +22,6 @@ pub const LowestSlot = data.LowestSlot; pub const Ping = ping_pong.Ping; pub const Pong = ping_pong.Pong; pub const SocketTag = data.SocketTag; +pub const PruneData = prune.PruneData; pub const getClusterEntrypoints = service.getClusterEntrypoints; diff --git a/src/gossip/message.zig b/src/gossip/message.zig index 57e831cc9..875b7189a 100644 --- a/src/gossip/message.zig +++ b/src/gossip/message.zig @@ -5,7 +5,6 @@ const bincode = sig.bincode; const testing = std.testing; const Pubkey = sig.core.Pubkey; -const Signature = sig.core.Signature; const SocketAddr = sig.net.SocketAddr; const SignedGossipData = sig.gossip.data.SignedGossipData; const GossipData = sig.gossip.data.GossipData; @@ -15,10 +14,8 @@ const Ping = sig.gossip.ping_pong.Ping; const Pong = sig.gossip.ping_pong.Pong; const DefaultPrng = std.rand.DefaultPrng; const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const PruneData = sig.gossip.prune.PruneData; -const getWallclockMs = sig.time.getWallclockMs; - -const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000; /// Analogous to [Protocol](https://github.com/solana-labs/solana/blob/e0203f22dc83cb792fa97f91dbe6e924cbd08af1/gossip/src/cluster_info.rs#L268) @@ -104,85 +101,7 @@ pub fn sanitizeWallclock(wallclock: u64) !void { } } -pub const PruneData = struct { - /// Pubkey of the node that sent this prune data - pubkey: Pubkey, - /// Pubkeys of nodes that should be pruned - prunes: []const Pubkey, - /// Signature of this Prune Message - signature: Signature, - /// The Pubkey of the intended node/destination for this message - destination: Pubkey, - /// Wallclock of the node that generated this message - wallclock: u64, - - const Self = @This(); - - pub fn init(pubkey: Pubkey, prunes: []Pubkey, destination: Pubkey, now: u64) Self { - return Self{ - .pubkey = pubkey, - .prunes = prunes, - .destination = destination, - .signature = Signature.init(.{0} ** 64), - .wallclock = now, - }; - } - - pub fn deinit(self: Self, allocator: std.mem.Allocator) void { - allocator.free(self.prunes); - } - - const PruneSignableData = struct { - pubkey: Pubkey, - prunes: []const Pubkey, - destination: Pubkey, - wallclock: u64, - }; - - pub fn initRandom(random: std.rand.Random, keypair: *KeyPair) !PruneData { - var self = PruneData{ - .pubkey = Pubkey.fromPublicKey(&keypair.public_key), - .prunes = &[0]Pubkey{}, - .signature = Signature.init(.{0} ** 64), - .destination = Pubkey.initRandom(random), - .wallclock = getWallclockMs(), - }; - try self.sign(keypair); - - return self; - } - - pub fn sign(self: *PruneData, keypair: *const KeyPair) !void { - // should always be enough space of is invalid msg - var slice: [PACKET_DATA_SIZE]u8 = undefined; - const signable_data = PruneSignableData{ - .pubkey = self.pubkey, - .prunes = self.prunes, - .destination = self.destination, - .wallclock = self.wallclock, - }; - const out = try bincode.writeToSlice(&slice, signable_data, bincode.Params{}); - var signature = try keypair.sign(out, null); - self.signature.data = signature.toBytes(); - } - - pub fn verify(self: *const PruneData) !void { - // should always be enough space of is invalid msg - var slice: [PACKET_DATA_SIZE]u8 = undefined; - const signable_data = PruneSignableData{ - .pubkey = self.pubkey, - .prunes = self.prunes, - .destination = self.destination, - .wallclock = self.wallclock, - }; - const out = try bincode.writeToSlice(&slice, signable_data, bincode.Params{}); - if (!try self.signature.verify(self.pubkey, out)) { - return error.InvalidSignature; - } - } -}; - -test "gossip.message: push message serialization is predictable" { +test "push message serialization is predictable" { var prng = DefaultPrng.init(0); const pubkey = Pubkey.initRandom(prng.random()); var values = std.ArrayList(SignedGossipData).init(std.testing.allocator); @@ -202,35 +121,7 @@ test "gossip.message: push message serialization is predictable" { try std.testing.expectEqual(value_size + empty_size, msg_value_size); } -test "gossip.message: test prune data sig verify" { - var keypair = try KeyPair.fromSecretKey(try std.crypto.sign.Ed25519.SecretKey.fromBytes([_]u8{ - 125, 52, 162, 97, 231, 139, 58, 13, 185, 212, 57, 142, 136, 12, 21, 127, 228, 71, - 115, 126, 138, 52, 102, 69, 103, 185, 45, 255, 132, 222, 243, 138, 25, 117, 21, 11, - 61, 170, 38, 18, 67, 196, 242, 219, 50, 154, 4, 254, 79, 227, 253, 229, 188, 230, - 121, 12, 227, 248, 199, 156, 253, 144, 175, 67, - })); - - var prng = DefaultPrng.init(0); - var prune = try PruneData.initRandom(prng.random(), &keypair); - - try prune.verify(); - - const rust_bytes = [_]u8{ 80, 98, 7, 181, 129, 96, 249, 247, 34, 39, 251, 41, 125, 241, 31, 25, 122, 103, 202, 48, 78, 160, 222, 65, 228, 81, 171, 237, 233, 87, 248, 29, 37, 0, 19, 66, 83, 207, 78, 86, 232, 157, 184, 144, 71, 12, 223, 86, 144, 169, 160, 171, 139, 248, 106, 63, 194, 178, 144, 119, 51, 60, 201, 7 }; - - var prune_v2 = PruneData{ - .pubkey = Pubkey.fromPublicKey(&keypair.public_key), - .prunes = &[0]Pubkey{}, - .signature = Signature.init(.{0} ** 64), - .destination = Pubkey.fromPublicKey(&keypair.public_key), - .wallclock = 0, - }; - try prune_v2.sign(&keypair); - - var sig_bytes = prune_v2.signature.data; - try std.testing.expectEqualSlices(u8, &rust_bytes, &sig_bytes); -} - -test "gossip.message: ping message serializes and deserializes correctly" { +test "ping message serializes and deserializes correctly" { var keypair = KeyPair.create(null) catch unreachable; var prng = std.rand.DefaultPrng.init(0); @@ -247,7 +138,7 @@ test "gossip.message: ping message serializes and deserializes correctly" { try testing.expect(std.mem.eql(u8, original.PingMessage.token[0..], deserialized.PingMessage.token[0..])); } -test "gossip.message: test ping pong sig verify" { +test "test ping pong sig verify" { var keypair = KeyPair.create(null) catch unreachable; var prng = std.rand.DefaultPrng.init(0); @@ -259,7 +150,7 @@ test "gossip.message: test ping pong sig verify" { try pong.verifySignature(); } -test "gossip.message: pull request serializes and deserializes" { +test "pull request serializes and deserializes" { var rust_bytes = [_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 190, 193, 13, 216, 175, 227, 117, 168, 246, 219, 213, 39, 67, 249, 88, 3, 238, 151, 144, 15, 23, 142, 153, 198, 47, 221, 117, 132, 218, 28, 29, 115, 248, 253, 211, 101, 137, 19, 174, 112, 43, 57, 251, 110, 173, 14, 71, 0, 186, 24, 36, 61, 75, 241, 119, 73, 86, 93, 136, 249, 167, 40, 134, 14, 0, 0, 0, 0, 25, 117, 21, 11, 61, 170, 38, 18, 67, 196, 242, 219, 50, 154, 4, 254, 79, 227, 253, 229, 188, 230, 121, 12, 227, 248, 199, 156, 253, 144, 175, 67, 0, 0, 0, 0, 127, 0, 0, 1, 210, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; var keypair = try KeyPair.fromSecretKey(try std.crypto.sign.Ed25519.SecretKey.fromBytes([_]u8{ 125, 52, 162, 97, 231, 139, 58, 13, 185, 212, 57, 142, 136, 12, 21, 127, 228, 71, @@ -304,7 +195,7 @@ test "gossip.message: pull request serializes and deserializes" { try std.testing.expectEqualDeep(pull, deserialized); } -test "gossip.message: push message serializes and deserializes correctly" { +test "push message serializes and deserializes correctly" { const kp_bytes = [_]u8{1} ** 32; const kp = try KeyPair.create(kp_bytes); const pk = kp.public_key; @@ -359,7 +250,7 @@ test "gossip.message: push message serializes and deserializes correctly" { try testing.expectEqualSlices(u8, &rust_bytes, bytes); } -test "gossip.message: Protocol.PullRequest.ContactInfo signature is valid" { +test "Protocol.PullRequest.ContactInfo signature is valid" { var contact_info_pull_response_packet_from_mainnet = [_]u8{ 1, 0, 0, 0, 9, 116, 228, 64, 179, 73, 145, 220, 74, 55, 179, 56, 86, 218, 47, 62, 172, 162, 127, 102, 37, 146, 103, 117, 255, 245, 248, 212, 101, 163, 188, 231, diff --git a/src/gossip/prune.zig b/src/gossip/prune.zig new file mode 100644 index 000000000..316a786f8 --- /dev/null +++ b/src/gossip/prune.zig @@ -0,0 +1,218 @@ +const std = @import("std"); +const sig = @import("../sig.zig"); + +const bincode = sig.bincode; + +const Pubkey = sig.core.Pubkey; +const Signature = sig.core.Signature; +const DefaultPrng = std.rand.DefaultPrng; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const SecretKey = std.crypto.sign.Ed25519.SecretKey; + +const getWallclockMs = sig.time.getWallclockMs; + +const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; +pub const PRUNE_DATA_PREFIX: []const u8 = "\xffSOLANA_PRUNE_DATA"; + +pub const PruneData = struct { + /// Pubkey of the node that sent this prune data + pubkey: Pubkey, + /// Pubkeys of nodes that should be pruned + prunes: []const Pubkey, + /// Signature of this Prune Message + signature: Signature, + /// The Pubkey of the intended node/destination for this message + destination: Pubkey, + /// Wallclock of the node that generated this message + wallclock: u64, + + const Self = @This(); + + pub fn init(pubkey: Pubkey, prunes: []Pubkey, destination: Pubkey, now: u64) Self { + return Self{ + .pubkey = pubkey, + .prunes = prunes, + .destination = destination, + .signature = Signature.init(.{0} ** 64), + .wallclock = now, + }; + } + + pub fn deinit(self: Self, allocator: std.mem.Allocator) void { + allocator.free(self.prunes); + } + + const PruneSignableData = struct { + pubkey: Pubkey, + prunes: []const Pubkey, + destination: Pubkey, + wallclock: u64, + }; + + const PruneSignableDataWithPrefix = struct { + prefix: []const u8 = PRUNE_DATA_PREFIX, + pubkey: Pubkey, + prunes: []const Pubkey, + destination: Pubkey, + wallclock: u64, + }; + + pub fn initRandom(random: std.rand.Random, keypair: *const KeyPair) !PruneData { + var self = PruneData{ + .pubkey = Pubkey.fromPublicKey(&keypair.public_key), + .prunes = &[0]Pubkey{}, + .signature = Signature.init(.{0} ** 64), + .destination = Pubkey.initRandom(random), + .wallclock = getWallclockMs(), + }; + try self.sign(keypair); + + return self; + } + + pub fn sign(self: *PruneData, keypair: *const KeyPair) !void { + try self.signWithoutPrefix(keypair); + } + + pub fn signWithoutPrefix(self: *PruneData, keypair: *const KeyPair) !void { + const signable_data = PruneSignableData{ + .pubkey = self.pubkey, + .prunes = self.prunes, + .destination = self.destination, + .wallclock = self.wallclock, + }; + + // serialize + var d: [PACKET_DATA_SIZE]u8 = undefined; + const data = try bincode.writeToSlice(&d, signable_data, .{}); + // sign + var signature = try keypair.sign(data, null); + self.signature.data = signature.toBytes(); + } + + pub fn signWithPrefix(self: *PruneData, keypair: *const KeyPair) !void { + const signable_data = PruneSignableDataWithPrefix{ + .pubkey = self.pubkey, + .prunes = self.prunes, + .destination = self.destination, + .wallclock = self.wallclock, + }; + + // serialize + var d: [PACKET_DATA_SIZE]u8 = undefined; + const data = try bincode.writeToSlice(&d, signable_data, .{}); + // sign + var signature = try keypair.sign(data, null); + self.signature.data = signature.toBytes(); + } + + pub fn verify(self: *const PruneData) !void { + self.verifyWithoutPrefix() catch |err| switch (err) { + error.InvalidSignature => try self.verifyWithPrefix(), + else => return err, + }; + } + + pub fn verifyWithoutPrefix(self: *const PruneData) !void { + const signable_data = PruneSignableData{ + .pubkey = self.pubkey, + .prunes = self.prunes, + .destination = self.destination, + .wallclock = self.wallclock, + }; + + // serialize + var d: [PACKET_DATA_SIZE]u8 = undefined; + const data = try bincode.writeToSlice(&d, signable_data, .{}); + // verify + if (!try self.signature.verify(self.pubkey, data)) + return error.InvalidSignature; + } + + pub fn verifyWithPrefix(self: *const PruneData) !void { + const signable_data = PruneSignableDataWithPrefix{ + .pubkey = self.pubkey, + .prunes = self.prunes, + .destination = self.destination, + .wallclock = self.wallclock, + }; + + // serialize + var d: [PACKET_DATA_SIZE]u8 = undefined; + const data = try bincode.writeToSlice(&d, signable_data, .{}); + // verify + if (!try self.signature.verify(self.pubkey, data)) + return error.InvalidSignature; + } +}; + +test "sign/verify PruneData with prefix" { + // src: https://github.com/anza-xyz/agave/blob/82347779ffdad910ce1f4bb23949e0c46bdddd33/gossip/src/protocol.rs#L686 + const wallclock = 1736887210868; + const keypair = try KeyPair.fromSecretKey(try SecretKey.fromBytes([_]u8{ + 187, 129, 57, 32, 118, 252, 92, 64, 33, 91, 198, 4, 45, 142, 35, 144, 247, + 236, 207, 93, 140, 218, 133, 14, 145, 14, 121, 148, 86, 67, 243, 201, 74, 44, + 91, 45, 177, 37, 96, 182, 179, 147, 191, 143, 138, 47, 10, 56, 172, 249, 27, + 230, 102, 29, 182, 139, 6, 61, 35, 28, 233, 6, 63, 229, + })); + const pubkey = Pubkey.fromPublicKey(&keypair.public_key); + const expected_pubkey = try Pubkey.fromString("5zYQ7PqYa81fw3rXAYUtmUcoL9TFwG67wcE9LW8hwtfE"); + try std.testing.expectEqual(expected_pubkey.data, pubkey.data); + + const prune1 = try Pubkey.fromString("1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM"); + const prune2 = try Pubkey.fromString("1111111ogCyDbaRMvkdsHB3qfdyFYaG1WtRUAfdh"); + const prune3 = try Pubkey.fromString("11111112D1oxKts8YPdTJRG5FzxTNpMtWmq8hkVx3"); + const destination = try Pubkey.fromString("11111112cMQwSC9qirWGjZM6gLGwW69X22mqwLLGP"); + + const expected_signature = try Signature.fromString( + "XjXQxG6vhrfPPQtddCgkfmKsH69YoUvG6GTrQfvmB73GUTjXCL5VDBE3Na94e4uT2MWPTBP3cinVdpHdBb9zAxY", + ); + + var prune_data = PruneData{ + .pubkey = pubkey, + .destination = destination, + .prunes = &[_]Pubkey{ prune1, prune2, prune3 }, + .signature = expected_signature, + .wallclock = wallclock, + }; + + // check if verification works (with expected signature) + try prune_data.verify(); + + // check if signing works + try prune_data.signWithPrefix(&keypair); + try std.testing.expectEqual(expected_signature.data, prune_data.signature.data); +} + +test "PruneData sig verify" { + var keypair = try KeyPair.fromSecretKey(try std.crypto.sign.Ed25519.SecretKey.fromBytes([_]u8{ + 125, 52, 162, 97, 231, 139, 58, 13, 185, 212, 57, 142, 136, 12, 21, 127, 228, 71, + 115, 126, 138, 52, 102, 69, 103, 185, 45, 255, 132, 222, 243, 138, 25, 117, 21, 11, + 61, 170, 38, 18, 67, 196, 242, 219, 50, 154, 4, 254, 79, 227, 253, 229, 188, 230, + 121, 12, 227, 248, 199, 156, 253, 144, 175, 67, + })); + + var prng = DefaultPrng.init(0); + var prune = try PruneData.initRandom(prng.random(), &keypair); + + try prune.verify(); + + const rust_bytes = [_]u8{ + 80, 98, 7, 181, 129, 96, 249, 247, 34, 39, 251, 41, 125, 241, 31, 25, 122, 103, + 202, 48, 78, 160, 222, 65, 228, 81, 171, 237, 233, 87, 248, 29, 37, 0, 19, 66, + 83, 207, 78, 86, 232, 157, 184, 144, 71, 12, 223, 86, 144, 169, 160, 171, 139, 248, + 106, 63, 194, 178, 144, 119, 51, 60, 201, 7, + }; + + var prune_v2 = PruneData{ + .pubkey = Pubkey.fromPublicKey(&keypair.public_key), + .prunes = &[0]Pubkey{}, + .signature = Signature.init(.{0} ** 64), + .destination = Pubkey.fromPublicKey(&keypair.public_key), + .wallclock = 0, + }; + try prune_v2.signWithoutPrefix(&keypair); + + var sig_bytes = prune_v2.signature.data; + try std.testing.expectEqualSlices(u8, &rust_bytes, &sig_bytes); +} diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 044b4ce47..2fd4c422b 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -41,7 +41,7 @@ const SignedGossipData = sig.gossip.data.SignedGossipData; const GossipData = sig.gossip.data.GossipData; const GossipDumpService = sig.gossip.dump_service.GossipDumpService; const GossipMessage = sig.gossip.message.GossipMessage; -const PruneData = sig.gossip.message.PruneData; +const PruneData = sig.gossip.PruneData; const GossipTable = sig.gossip.table.GossipTable; const HashTimeQueue = sig.gossip.table.HashTimeQueue; const AutoArrayHashSet = sig.gossip.table.AutoArrayHashSet; @@ -98,6 +98,7 @@ const DEFAULT_EPOCH_DURATION = Duration.fromMillis(172_800_000); pub const VERIFY_PACKET_PARALLEL_TASKS = 4; +const THREAD_POOL_SIZE = 4; const MAX_PROCESS_BATCH_SIZE = 64; const GOSSIP_PRNG_SEED = 19; @@ -232,7 +233,7 @@ pub const GossipService = struct { gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT_US) catch return error.SocketSetTimeoutFailed; // 1 second // setup the threadpool for processing messages - const n_threads: usize = @min(std.Thread.getCpuCount() catch 1, 8); + const n_threads: usize = @min(std.Thread.getCpuCount() catch 1, THREAD_POOL_SIZE); const thread_pool = ThreadPool.init(.{ .max_threads = @intCast(n_threads), .stack_size = 2 * 1024 * 1024, @@ -266,7 +267,7 @@ pub const GossipService = struct { const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key); const my_shred_version = my_contact_info.shred_version; const failed_pull_hashes = HashTimeQueue.init(allocator); - const metrics = try GossipMetrics.init(gossip_logger); + const metrics = try GossipMetrics.init(); const exit_counter = try allocator.create(Atomic(u64)); exit_counter.* = Atomic(u64).init(0); @@ -737,8 +738,6 @@ pub const GossipService = struct { // only add the count once we've finished processing defer self.metrics.gossip_packets_processed_total.add(gossip_packets_processed_total); - self.metrics.maybeLog(); - // handle batch messages if (push_messages.items.len > 0) { var x_timer = try sig.time.Timer.start(); @@ -2096,36 +2095,7 @@ pub const GossipMetrics = struct { table_pubkeys_dropped: *Counter, table_old_values_removed: *Counter, - // logging details - _logging_fields: struct { - // Scoping to GossipService instead of logging fields struct. - logger: GossipService.ScopedLogger, - log_interval_micros: i64 = 10 * std.time.us_per_s, - last_log: i64 = 0, - last_logged_snapshot: StatsToLog = .{}, - updates_since_last: u64 = 0, - }, - const GaugeU64 = Gauge(u64); - - const StatsToLog = struct { - gossip_packets_received_total: u64 = 0, - - ping_messages_recv: u64 = 0, - pong_messages_recv: u64 = 0, - push_messages_recv: u64 = 0, - pull_requests_recv: u64 = 0, - pull_responses_recv: u64 = 0, - prune_messages_recv: u64 = 0, - - ping_messages_sent: u64 = 0, - pong_messages_sent: u64 = 0, - push_messages_sent: u64 = 0, - pull_requests_sent: u64 = 0, - pull_responses_sent: u64 = 0, - prune_messages_sent: u64 = 0, - }; - const Self = @This(); pub const histogram_buckets: [10]f64 = .{ @@ -2136,76 +2106,18 @@ pub const GossipMetrics = struct { 5000, 10000, }; - pub fn init(logger: GossipService.ScopedLogger) GetMetricError!Self { + pub fn init() GetMetricError!Self { var self: Self = undefined; const registry = globalRegistry(); - std.debug.assert(try registry.initFields(&self) == 1); - self._logging_fields = .{ .logger = logger }; + std.debug.assert(try registry.initFields(&self) == 0); return self; } pub fn reset(self: *Self) void { inline for (@typeInfo(GossipMetrics).Struct.fields) |field| { - if (field.name[0] != '_') { - @field(self, field.name).reset(); - } + @field(self, field.name).reset(); } } - - /// If log_interval_millis has passed since the last log, - /// then log the number of events since then. - fn maybeLog( - self: *Self, - ) void { - const now = std.time.microTimestamp(); - const logging_fields = self._logging_fields; - const interval = @as(u64, @intCast(now -| logging_fields.last_log)); - if (interval < logging_fields.log_interval_micros) return; - - const current_stats = StatsToLog{ - .gossip_packets_received_total = self.gossip_packets_received_total.get(), - .ping_messages_recv = self.ping_messages_recv.get(), - .pong_messages_recv = self.pong_messages_recv.get(), - .push_messages_recv = self.push_messages_recv.get(), - .pull_requests_recv = self.pull_requests_recv.get(), - .pull_responses_recv = self.pull_responses_recv.get(), - .prune_messages_recv = self.prune_messages_recv.get(), - - .ping_messages_sent = self.ping_messages_sent.get(), - .pong_messages_sent = self.pong_messages_sent.get(), - .push_messages_sent = self.push_messages_sent.get(), - .pull_requests_sent = self.pull_requests_sent.get(), - .pull_responses_sent = self.pull_responses_sent.get(), - .prune_messages_sent = self.prune_messages_sent.get(), - }; - - logging_fields.logger.info().logf( - "recv {}: {} ping, {} pong, {} push, {} pull request, {} pull response, {} prune", - .{ - current_stats.gossip_packets_received_total - logging_fields.last_logged_snapshot.gossip_packets_received_total, - current_stats.ping_messages_recv - logging_fields.last_logged_snapshot.ping_messages_recv, - current_stats.pong_messages_recv - logging_fields.last_logged_snapshot.pong_messages_recv, - current_stats.push_messages_recv - logging_fields.last_logged_snapshot.push_messages_recv, - current_stats.pull_requests_recv - logging_fields.last_logged_snapshot.pull_requests_recv, - current_stats.pull_responses_recv - logging_fields.last_logged_snapshot.pull_responses_recv, - current_stats.prune_messages_recv - logging_fields.last_logged_snapshot.prune_messages_recv, - }, - ); - logging_fields.logger.info().logf( - "sent: {} ping, {} pong, {} push, {} pull request, {} pull response, {} prune", - .{ - current_stats.ping_messages_sent - logging_fields.last_logged_snapshot.ping_messages_sent, - current_stats.pong_messages_sent - logging_fields.last_logged_snapshot.pong_messages_sent, - current_stats.push_messages_sent - logging_fields.last_logged_snapshot.push_messages_sent, - current_stats.pull_requests_sent - logging_fields.last_logged_snapshot.pull_requests_sent, - current_stats.pull_responses_sent - logging_fields.last_logged_snapshot.pull_responses_sent, - current_stats.prune_messages_sent - logging_fields.last_logged_snapshot.prune_messages_sent, - }, - ); - self._logging_fields.last_logged_snapshot = current_stats; - self._logging_fields.last_log = now; - self._logging_fields.updates_since_last = 0; - } }; pub const ChunkType = enum(u8) { diff --git a/src/ledger/fuzz.zig b/src/ledger/fuzz.zig index 07bda5dfc..1cf7654fd 100644 --- a/src/ledger/fuzz.zig +++ b/src/ledger/fuzz.zig @@ -87,7 +87,7 @@ pub fn run(initial_seed: u64, args: *std.process.ArgIterator) !void { var count: u64 = 0; - while (true) outer: { + outer: while (true) { var prng = std.Random.DefaultPrng.init(seed); const random = prng.random(); // This is a simpler blockstore which is used to make sure @@ -97,7 +97,7 @@ pub fn run(initial_seed: u64, args: *std.process.ArgIterator) !void { for (0..1_000) |_| { if (maybe_max_actions) |max| { if (count >= max) { - std.debug.print("{s} reached max actions: {}\n", .{ "action_name", max }); + std.debug.print("reached max actions: {}\n", .{max}); break :outer; } }