diff --git a/.gitignore b/.gitignore index 1616117f8..6152ca1d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,8 @@ .zig-cache/ +zig-cache/ zig-out/ data/ -# unpacking test snapshots -test_data/version -test_data/accounts/ -test_data/snapshots/ -test_data/tmp/ -test_data/bench_snapshot/ -test_data/accountsdb_fuzz/ index_storage/ /gossip-dumps diff --git a/src/accountsdb/genesis_config.zig b/src/accountsdb/genesis_config.zig index a9e56d1f9..0b342e09a 100644 --- a/src/accountsdb/genesis_config.zig +++ b/src/accountsdb/genesis_config.zig @@ -108,10 +108,10 @@ pub const EpochSchedule = extern struct { first_normal_slot: Slot, pub fn getEpoch(self: *const EpochSchedule, slot: Slot) Epoch { - return self.getEpochAndSlotIndex(slot).epoch; + return self.getEpochAndSlotIndex(slot)[0]; } - pub fn getEpochAndSlotIndex(self: *const EpochSchedule, slot: Slot) struct { epoch: Epoch, slot_index: Slot } { + pub fn getEpochAndSlotIndex(self: *const EpochSchedule, slot: Slot) struct { Epoch, Slot } { if (slot < self.first_normal_slot) { var epoch = slot +| MINIMUM_SLOTS_PER_EPOCH +| 1; epoch = @ctz(std.math.ceilPowerOfTwo(u64, epoch) catch { @@ -123,10 +123,7 @@ pub const EpochSchedule = extern struct { const slot_index = slot -| (epoch_len -| MINIMUM_SLOTS_PER_EPOCH); - return .{ - .epoch = epoch, - .slot_index = slot_index, - }; + return .{ epoch, slot_index }; } else { const normal_slot_index = slot -| self.first_normal_slot; const normal_epoch_index = std.math.divTrunc(u64, normal_slot_index, self.slots_per_epoch) catch 0; @@ -134,12 +131,18 @@ pub const EpochSchedule = extern struct { const epoch = self.first_normal_epoch +| normal_epoch_index; const slot_index = std.math.rem(u64, normal_slot_index, self.slots_per_epoch) catch 0; - return .{ - .epoch = epoch, - .slot_index = slot_index, - }; + return .{ epoch, slot_index }; } } + + /// get the length of the given epoch (in slots) + pub fn getSlotsInEpoch(self: *const EpochSchedule, epoch: Epoch) Slot { + comptime std.debug.assert(std.math.isPowerOfTwo(MINIMUM_SLOTS_PER_EPOCH)); + return if (epoch < self.first_normal_epoch) + @as(Slot, 1) <<| epoch +| @ctz(MINIMUM_SLOTS_PER_EPOCH) + else + self.slots_per_epoch; + } }; pub const ClusterType = enum(u8) { diff --git a/src/accountsdb/lib.zig b/src/accountsdb/lib.zig index 3e7a448d3..de2680618 100644 --- a/src/accountsdb/lib.zig +++ b/src/accountsdb/lib.zig @@ -12,6 +12,7 @@ pub const AccountsDB = db.AccountsDB; pub const AllSnapshotFields = snapshots.AllSnapshotFields; pub const Bank = bank.Bank; pub const GenesisConfig = genesis_config.GenesisConfig; +pub const SnapshotFields = snapshots.SnapshotFields; pub const SnapshotFieldsAndPaths = snapshots.SnapshotFieldsAndPaths; pub const SnapshotFiles = snapshots.SnapshotFiles; pub const StatusCache = snapshots.StatusCache; diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index 8a6034f87..f7f440de0 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -55,7 +55,7 @@ pub const Stakes = struct { }; pub const VoteAccounts = struct { - vote_accounts: HashMap(Pubkey, struct { u64, Account }), + vote_accounts: HashMap(Pubkey, struct { u64, VoteAccount }), staked_nodes: ?HashMap( Pubkey, // VoteAccount.vote_state.node_pubkey. @@ -63,8 +63,61 @@ pub const VoteAccounts = struct { ) = null, pub const @"!bincode-config:staked_nodes" = bincode.FieldConfig(?HashMap(Pubkey, u64)){ .skip = true }; + + const Self = @This(); + + pub fn stakedNodes(self: *Self, allocator: std.mem.Allocator) !*const HashMap(Pubkey, u64) { + if (self.staked_nodes) |*staked_nodes| { + return staked_nodes; + } + const vote_accounts = self.vote_accounts; + var staked_nodes = HashMap(Pubkey, u64).init(allocator); + var iter = vote_accounts.iterator(); + while (iter.next()) |vote_entry| { + const vote_state = try vote_entry.value_ptr[1].voteState(); + const node_entry = try staked_nodes.getOrPut(vote_state.node_pubkey); + if (!node_entry.found_existing) { + node_entry.value_ptr.* = 0; + } + node_entry.value_ptr.* += vote_entry.value_ptr[0]; + } + self.staked_nodes = staked_nodes; + return &self.staked_nodes.?; + } }; +pub const VoteAccount = struct { + account: Account, + vote_state: ?anyerror!VoteState = null, + + pub const @"!bincode-config:vote_state" = bincode.FieldConfig(?anyerror!VoteState){ .skip = true }; + + pub fn voteState(self: *@This()) !VoteState { + if (self.vote_state) |vs| { + return vs; + } + self.vote_state = bincode.readFromSlice(undefined, VoteState, self.account.data, .{}); + return self.vote_state.?; + } +}; + +pub const VoteState = struct { + /// The variant of the rust enum + tag: u32, // TODO: consider varint bincode serialization (in rust this is enum) + /// the node that votes in this account + node_pubkey: Pubkey, +}; + +test "deserialize VoteState.node_pubkey" { + const bytes = .{ + 2, 0, 0, 0, 60, 155, 13, 144, 187, 252, 153, 72, 190, 35, 87, 94, 7, 178, + 90, 174, 158, 6, 199, 179, 134, 194, 112, 248, 166, 232, 144, 253, 128, 249, 67, 118, + } ++ .{0} ** 1586 ++ .{ 31, 0, 0, 0, 0, 0, 0, 0, 1 } ++ .{0} ** 24; + const vote_state = try bincode.readFromSlice(undefined, VoteState, &bytes, .{}); + const expected_pubkey = try Pubkey.fromString("55abJrqFnjm7ZRB1noVdh7BzBe3bBSMFT3pt16mw6Vad"); + try std.testing.expect(expected_pubkey.equals(&vote_state.node_pubkey)); +} + pub const Delegation = struct { /// to whom the stake is delegated voter_pubkey: Pubkey, diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index f0065e12c..a1a0b6da8 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -6,6 +6,7 @@ const helpers = @import("helpers.zig"); const sig = @import("../lib.zig"); const config = @import("config.zig"); +const Allocator = std.mem.Allocator; const Atomic = std.atomic.Value; const KeyPair = std.crypto.sign.Ed25519.KeyPair; const AccountsDB = sig.accounts_db.AccountsDB; @@ -19,6 +20,7 @@ const Level = sig.trace.Level; const Logger = sig.trace.Logger; const Pubkey = sig.core.Pubkey; const ShredCollectorDependencies = sig.shred_collector.ShredCollectorDependencies; +const SingleEpochLeaderSchedule = sig.core.leader_schedule.SingleEpochLeaderSchedule; const SnapshotFieldsAndPaths = sig.accounts_db.SnapshotFieldsAndPaths; const SnapshotFiles = sig.accounts_db.SnapshotFiles; const SocketAddr = sig.net.SocketAddr; @@ -29,9 +31,12 @@ const enumFromName = sig.utils.types.enumFromName; const getOrInitIdentity = helpers.getOrInitIdentity; const globalRegistry = sig.prometheus.globalRegistry; const getWallclockMs = sig.gossip.getWallclockMs; +const leaderScheduleFromBank = sig.core.leader_schedule.leaderScheduleFromBank; const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall; +const parseLeaderSchedule = sig.core.leader_schedule.parseLeaderSchedule; const requestIpEcho = sig.net.requestIpEcho; const servePrometheus = sig.prometheus.servePrometheus; +const writeLeaderSchedule = sig.core.leader_schedule.writeLeaderSchedule; const SocketTag = sig.gossip.SocketTag; @@ -92,6 +97,14 @@ var turbine_recv_port_option = cli.Option{ .value_name = "Turbine Port", }; +var leader_schedule_option = cli.Option{ + .long_name = "leader-schedule", + .help = "Set a file path to load the leader schedule. Use '--' to load from stdin", + .value_ref = cli.mkRef(&config.current.leader_schedule_path), + .required = false, + .value_name = "Leader schedule source", +}; + var test_repair_option = cli.Option{ .long_name = "test-repair-for-slot", .help = "Set a slot here to repeatedly send repair requests for shreds from this slot. This is only intended for use during short-lived tests of the repair service. Do not set this during normal usage.", @@ -298,6 +311,8 @@ var app = &cli.App{ &min_snapshot_download_speed_mb_option, &force_new_snapshot_download_option, &trusted_validators_option, + // general + &leader_schedule_option, }, .target = .{ .action = .{ @@ -330,6 +345,44 @@ var app = &cli.App{ }, }, }, + &cli.Command{ + .name = "leader-schedule", + .description = .{ + .one_line = "Prints the leader schedule from the snapshot", + .detailed = + \\- Starts gossip + \\- acquires a snapshot if necessary + \\- loads accounts db from the snapshot + \\- calculates the leader schedule from the snaphot + \\- prints the leader schedule in the same format as `solana leader-schedule` + \\- exits + , + }, + .options = &.{ + // gossip + &gossip_host.option, + &gossip_port_option, + &gossip_entrypoints_option, + &gossip_spy_node_option, + &gossip_dump_option, + // accounts-db + &snapshot_dir_option, + &use_disk_index_option, + &n_threads_snapshot_load_option, + &n_threads_snapshot_unpack_option, + &force_unpack_snapshot_option, + &min_snapshot_download_speed_mb_option, + &force_new_snapshot_download_option, + &trusted_validators_option, + // general + &leader_schedule_option, + }, + .target = .{ + .action = .{ + .exec = printLeaderSchedule, + }, + }, + }, }, }, }, @@ -349,44 +402,19 @@ fn identity() !void { /// entrypoint to run only gossip fn gossip() !void { - var logger = try spawnLogger(); - defer logger.deinit(); - const metrics_thread = try spawnMetrics(logger); - defer metrics_thread.detach(); - - var exit = std.atomic.Value(bool).init(false); - const my_keypair = try getOrInitIdentity(gpa_allocator, logger); - const entrypoints = try getEntrypoints(logger); - defer entrypoints.deinit(); - const my_data = try getMyDataFromIpEcho(logger, entrypoints.items); + var app_base = try AppBase.init(gpa_allocator); - var gossip_service = try initGossip( - logger, - my_keypair, - &exit, - entrypoints.items, - my_data.shred_version, - my_data.ip, - &.{}, - ); + var gossip_service, var gossip_manager = try startGossip(gpa_allocator, &app_base, &.{}); defer gossip_service.deinit(); + defer gossip_manager.deinit(); - try runGossipWithConfigValues(&gossip_service); + gossip_manager.join(); } /// entrypoint to run a full solana validator fn validator() !void { - var logger = try spawnLogger(); - defer logger.deinit(); - const metrics_thread = try spawnMetrics(logger); - defer metrics_thread.detach(); - - var rand = std.rand.DefaultPrng.init(@bitCast(std.time.timestamp())); - var exit = std.atomic.Value(bool).init(false); - const my_keypair = try getOrInitIdentity(gpa_allocator, logger); - const entrypoints = try getEntrypoints(logger); - defer entrypoints.deinit(); - const ip_echo_data = try getMyDataFromIpEcho(logger, entrypoints.items); + const allocator = gpa_allocator; + var app_base = try AppBase.init(allocator); const repair_port: u16 = config.current.shred_collector.repair_port; const turbine_recv_port: u16 = config.current.shred_collector.repair_port; @@ -394,104 +422,110 @@ fn validator() !void { try std.fs.cwd().makePath(snapshot_dir_str); - // gossip - var gossip_service = try initGossip( - logger, - my_keypair, - &exit, - entrypoints.items, - ip_echo_data.shred_version, // TODO atomic owned at top level? or owned by gossip is good? - ip_echo_data.ip, - &.{ - .{ .tag = .repair, .port = repair_port }, - .{ .tag = .turbine_recv, .port = turbine_recv_port }, - }, - ); + var gossip_service, var gossip_manager = try startGossip(allocator, &app_base, &.{ + .{ .tag = .repair, .port = repair_port }, + .{ .tag = .turbine_recv, .port = turbine_recv_port }, + }); defer gossip_service.deinit(); - const gossip_handle = try std.Thread.spawn(.{}, runGossipWithConfigValues, .{&gossip_service}); + defer gossip_manager.deinit(); + + const snapshot = try loadFromSnapshot(allocator, app_base.logger, gossip_service, false); + + // leader schedule + var leader_schedule = try getLeaderScheduleFromCli(allocator) orelse + try leaderScheduleFromBank(allocator, &snapshot.bank); + const leader_provider = leader_schedule.provider(); // shred collector + var shred_col_conf = config.current.shred_collector; + shred_col_conf.start_slot = shred_col_conf.start_slot orelse snapshot.bank.bank_fields.slot; + var rng = std.rand.DefaultPrng.init(@bitCast(std.time.timestamp())); var shred_collector_manager = try sig.shred_collector.start( - config.current.shred_collector, + shred_col_conf, ShredCollectorDependencies{ - .allocator = gpa_allocator, - .logger = logger, - .random = rand.random(), - .my_keypair = &my_keypair, - .exit = &exit, + .allocator = allocator, + .logger = app_base.logger, + .random = rng.random(), + .my_keypair = &app_base.my_keypair, + .exit = &app_base.exit, .gossip_table_rw = &gossip_service.gossip_table_rw, .my_shred_version = &gossip_service.my_shred_version, + .leader_schedule = leader_provider, }, ); defer shred_collector_manager.deinit(); - // accounts db - var snapshots = try getOrDownloadSnapshots( - gpa_allocator, - logger, - &gossip_service, - ); - defer snapshots.deinit(gpa_allocator); - - logger.infof("full snapshot: {s}", .{snapshots.full_path}); - if (snapshots.incremental_path) |inc_path| { - logger.infof("incremental snapshot: {s}", .{inc_path}); - } + gossip_manager.join(); + shred_collector_manager.join(); +} - const n_threads_snapshot_load: u32 = blk: { - const n_threads_snapshot_load: u32 = config.current.accounts_db.num_threads_snapshot_load; - if (n_threads_snapshot_load == 0) break :blk @truncate(try std.Thread.getCpuCount()); - break :blk n_threads_snapshot_load; +/// entrypoint to print the leader schedule and then exit +fn printLeaderSchedule() !void { + const allocator = gpa_allocator; + var app_base = try AppBase.init(allocator); + + const leader_schedule = try getLeaderScheduleFromCli(allocator) orelse b: { + app_base.logger.info("Downloading a snapshot to calculate the leader schedule."); + var gossip_service, var gossip_manager = try startGossip(allocator, &app_base, &.{}); + defer gossip_service.deinit(); + defer gossip_manager.deinit(); + const snapshot = try loadFromSnapshot(allocator, app_base.logger, gossip_service, false); + break :b try leaderScheduleFromBank(allocator, &snapshot.bank); }; - var accounts_db = try AccountsDB.init( - gpa_allocator, - logger, - config.current.accounts_db, - ); - defer accounts_db.deinit(false); // keep index files on disk - - const snapshot_fields = try accounts_db.loadWithDefaults( - &snapshots, - snapshot_dir_str, - n_threads_snapshot_load, - true, // validate too - ); - const bank_fields = snapshot_fields.bank_fields; - - // this should exist before we start to unpack - logger.infof("reading genesis...", .{}); - const genesis_config = readGenesisConfig(gpa_allocator, snapshot_dir_str) catch |err| { - if (err == error.GenesisNotFound) { - logger.errf("genesis.bin not found - expecting {s}/genesis.bin to exist", .{snapshot_dir_str}); - } - return err; - }; - defer genesis_config.deinit(gpa_allocator); - - logger.infof("validating bank...", .{}); - const bank = Bank.init(&accounts_db, &bank_fields); - try Bank.validateBankFields(bank.bank_fields, &genesis_config); - - // validate the status cache - logger.infof("validating status cache...", .{}); - var status_cache = readStatusCache(gpa_allocator, snapshot_dir_str) catch |err| { - if (err == error.StatusCacheNotFound) { - logger.errf("status-cache.bin not found - expecting {s}/snapshots/status-cache to exist", .{snapshot_dir_str}); - } - return err; - }; - defer status_cache.deinit(); + var stdout = std.io.bufferedWriter(std.io.getStdOut().writer()); + try writeLeaderSchedule(leader_schedule, stdout.writer()); + try stdout.flush(); +} - var slot_history = try accounts_db.getSlotHistory(); - defer slot_history.deinit(accounts_db.allocator); - try status_cache.validate(gpa_allocator, bank_fields.slot, &slot_history); +fn getLeaderScheduleFromCli(allocator: Allocator) !?SingleEpochLeaderSchedule { + return if (config.current.leader_schedule_path) |path| + if (std.mem.eql(u8, "--", path)) + try parseLeaderSchedule(allocator, std.io.getStdIn().reader()) + else + try parseLeaderSchedule(allocator, (try std.fs.cwd().openFile(path, .{})).reader()) + else + null; +} - logger.infof("accounts-db setup done...", .{}); +/// State that typically needs to be initialized at the start of the app, +/// and deinitialized only when the app exits. +const AppBase = struct { + exit: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + logger: Logger, + metrics_thread: std.Thread, + my_keypair: KeyPair, + entrypoints: std.ArrayList(SocketAddr), + shred_version: u16, + my_ip: IpAddr, + + fn init(allocator: Allocator) !AppBase { + var logger = try spawnLogger(); + errdefer logger.deinit(); + const metrics_thread = try spawnMetrics(logger); + errdefer metrics_thread.detach(); + const my_keypair = try getOrInitIdentity(allocator, logger); + const entrypoints = try getEntrypoints(logger); + errdefer entrypoints.deinit(); + const ip_echo_data = try getMyDataFromIpEcho(logger, entrypoints.items); + + return .{ + .logger = logger, + .metrics_thread = metrics_thread, + .my_keypair = my_keypair, + .entrypoints = entrypoints, + .shred_version = ip_echo_data.shred_version, + .my_ip = ip_echo_data.ip, + }; + } - gossip_handle.join(); - shred_collector_manager.join(); -} + pub fn deinit(self: @This()) void { + self.exit.store(true, .unordered); + self.entrypoints.deinit(); + self.metrics_thread.detach(); + self.logger.deinit(); + } +}; /// Initialize an instance of GossipService and configure with CLI arguments fn initGossip( @@ -525,6 +559,51 @@ fn initGossip( ); } +fn startGossip( + allocator: Allocator, + app_base: *AppBase, + /// Extra sockets to publish in gossip, other than the gossip socket + extra_sockets: []const struct { tag: SocketTag, port: u16 }, +) !struct { *GossipService, sig.utils.service_manager.ServiceManager } { + const gossip_port = config.current.gossip.port; + app_base.logger.infof("gossip host: {any}", .{app_base.my_ip}); + app_base.logger.infof("gossip port: {d}", .{gossip_port}); + + // setup contact info + const my_pubkey = Pubkey.fromPublicKey(&app_base.my_keypair.public_key); + var contact_info = ContactInfo.init(allocator, my_pubkey, getWallclockMs(), 0); + try contact_info.setSocket(.gossip, SocketAddr.init(app_base.my_ip, gossip_port)); + for (extra_sockets) |s| try contact_info.setSocket(s.tag, SocketAddr.init(app_base.my_ip, s.port)); + contact_info.shred_version = app_base.shred_version; + + var manager = sig.utils.service_manager.ServiceManager.init( + allocator, + app_base.logger, + &app_base.exit, + "gossip", + .{}, + .{}, + ); + const service = try manager.arena().create(GossipService); + service.* = try GossipService.init( + gpa_allocator, + gossip_value_gpa_allocator, + contact_info, + app_base.my_keypair, // TODO: consider security implication of passing keypair by value + app_base.entrypoints.items, + &app_base.exit, + app_base.logger, + ); + try manager.defers.deferCall(GossipService.deinit, .{service}); + + try service.start(.{ + .spy_node = config.current.gossip.spy_node, + .dump = config.current.gossip.dump, + }, &manager); + + return .{ service, manager }; +} + fn runGossipWithConfigValues(gossip_service: *GossipService) !void { const gossip_config = config.current.gossip; return gossip_service.run(.{ @@ -636,11 +715,110 @@ fn spawnLogger() !Logger { return logger; } +const LoadedSnapshot = struct { + allocator: Allocator, + accounts_db: AccountsDB, + snapshot_fields: sig.accounts_db.SnapshotFields, + /// contains pointers to `accounts_db` and `snapshot_fields` + bank: Bank, + genesis_config: GenesisConfig, + + pub fn deinit(self: *@This()) void { + self.genesis_config.deinit(self.allocator); + self.snapshot_fields.deinit(self.allocator); + self.accounts_db.deinit(false); // keep index files on disk + self.allocator.destroy(self); + } +}; + +fn loadFromSnapshot( + allocator: Allocator, + logger: Logger, + gossip_service: *GossipService, + validate_genesis: bool, +) !*LoadedSnapshot { + const output = try allocator.create(LoadedSnapshot); + errdefer allocator.destroy(output); + var snapshots = try getOrDownloadSnapshots( + allocator, + logger, + gossip_service, + ); + defer snapshots.deinit(allocator); + + logger.infof("full snapshot: {s}", .{snapshots.full_path}); + if (snapshots.incremental_path) |inc_path| { + logger.infof("incremental snapshot: {s}", .{inc_path}); + } + + // cli parsing + const snapshot_dir_str = config.current.accounts_db.snapshot_dir; + const n_threads_snapshot_load: u32 = blk: { + const n_threads_snapshot_load: u32 = config.current.accounts_db.num_threads_snapshot_load; + if (n_threads_snapshot_load == 0) break :blk @truncate(try std.Thread.getCpuCount()); + break :blk n_threads_snapshot_load; + }; + + output.accounts_db = try AccountsDB.init( + allocator, + logger, + config.current.accounts_db, + ); + errdefer output.accounts_db.deinit(false); + + output.snapshot_fields = try output.accounts_db.loadWithDefaults( + &snapshots, + snapshot_dir_str, + n_threads_snapshot_load, + true, // validate too + ); + errdefer output.snapshot_fields.deinit(allocator); + + const bank_fields = &output.snapshot_fields.bank_fields; + + // this should exist before we start to unpack + logger.infof("reading genesis...", .{}); + output.genesis_config = readGenesisConfig(allocator, snapshot_dir_str) catch |err| { + if (err == error.GenesisNotFound) { + logger.errf("genesis.bin not found - expecting {s}/genesis.bin to exist", .{snapshot_dir_str}); + } + return err; + }; + errdefer output.genesis_config.deinit(allocator); + + logger.infof("validating bank...", .{}); + output.bank = Bank.init(&output.accounts_db, bank_fields); + Bank.validateBankFields(output.bank.bank_fields, &output.genesis_config) catch |e| switch (e) { + // TODO: remove when genesis validation works on all clusters + error.BankAndGenesisMismatch => if (validate_genesis) { + return e; + } else { + logger.err("Bank failed genesis validation."); + }, + else => return e, + }; + + // validate the status cache + logger.infof("validating status cache...", .{}); + var status_cache = readStatusCache(allocator, snapshot_dir_str) catch |err| { + if (err == error.StatusCacheNotFound) { + logger.errf("status-cache.bin not found - expecting {s}/snapshots/status-cache to exist", .{snapshot_dir_str}); + } + return err; + }; + defer status_cache.deinit(); + + var slot_history = try output.accounts_db.getSlotHistory(); + defer slot_history.deinit(output.accounts_db.allocator); + try status_cache.validate(allocator, bank_fields.slot, &slot_history); + + logger.infof("accounts-db setup done...", .{}); + + return output; +} + /// load genesis config with default filenames -fn readGenesisConfig( - allocator: std.mem.Allocator, - snapshot_dir: []const u8, -) !GenesisConfig { +fn readGenesisConfig(allocator: Allocator, snapshot_dir: []const u8) !GenesisConfig { const genesis_path = try std.fmt.allocPrint( allocator, "{s}/genesis.bin", @@ -656,10 +834,7 @@ fn readGenesisConfig( return genesis_config; } -fn readStatusCache( - allocator: std.mem.Allocator, - snapshot_dir: []const u8, -) !StatusCache { +fn readStatusCache(allocator: Allocator, snapshot_dir: []const u8) !StatusCache { const status_cache_path = try std.fmt.allocPrint( gpa_allocator, "{s}/{s}", @@ -715,9 +890,7 @@ fn downloadSnapshot() !void { ); } -fn getTrustedValidators( - allocator: std.mem.Allocator, -) !?std.ArrayList(Pubkey) { +fn getTrustedValidators(allocator: Allocator) !?std.ArrayList(Pubkey) { var trusted_validators: ?std.ArrayList(Pubkey) = null; if (config.current.gossip.trusted_validators.len > 0) { trusted_validators = try std.ArrayList(Pubkey).initCapacity( @@ -735,7 +908,7 @@ fn getTrustedValidators( } fn getOrDownloadSnapshots( - allocator: std.mem.Allocator, + allocator: Allocator, logger: Logger, gossip_service: ?*GossipService, ) !SnapshotFieldsAndPaths { diff --git a/src/cmd/config.zig b/src/cmd/config.zig index 2577019d5..f0eb394eb 100644 --- a/src/cmd/config.zig +++ b/src/cmd/config.zig @@ -6,6 +6,7 @@ pub const Config = struct { gossip: GossipConfig = .{}, shred_collector: ShredCollectorConfig = shred_collector_defaults, accounts_db: AccountsDBConfig = .{}, + leader_schedule_path: ?[]const u8 = null, // general config log_level: []const u8 = "debug", metrics_port: u16 = 12345, diff --git a/src/core/leader_schedule.zig b/src/core/leader_schedule.zig new file mode 100644 index 000000000..0147bd2a6 --- /dev/null +++ b/src/core/leader_schedule.zig @@ -0,0 +1,390 @@ +const std = @import("std"); +const sig = @import("../lib.zig"); + +const Allocator = std.mem.Allocator; + +const Bank = sig.accounts_db.Bank; +const ChaChaRng = sig.rand.ChaChaRng; +const Epoch = sig.core.Epoch; +const Pubkey = sig.core.Pubkey; +const Slot = sig.core.Slot; +const WeightedRandomSampler = sig.rand.WeightedRandomSampler; + +pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4; + +pub const SlotLeaderProvider = sig.utils.closure.PointerClosure(Slot, ?Pubkey); + +/// Only works for a single epoch. This is a basic limited approach that should +/// only be used as a placeholder until a better approach is fleshed out. +pub const SingleEpochLeaderSchedule = struct { + allocator: Allocator, + slot_leaders: []const sig.core.Pubkey, + start_slot: Slot, + + const Self = @This(); + + pub fn deinit(self: Self) void { + self.allocator.free(self.slot_leaders); + } + + pub fn getLeader(self: *const Self, slot: Slot) ?sig.core.Pubkey { + const index: usize = @intCast(slot - self.start_slot); + return if (index >= self.slot_leaders.len) null else self.slot_leaders[index]; + } + + pub fn provider(self: *Self) SlotLeaderProvider { + return SlotLeaderProvider.init(self, Self.getLeader); + } +}; + +pub fn leaderScheduleFromBank(allocator: Allocator, bank: *const Bank) !SingleEpochLeaderSchedule { + const epoch = bank.bank_fields.epoch; + const epoch_stakes = bank.bank_fields.epoch_stakes.getPtr(epoch) orelse return error.NoEpochStakes; + const slots_in_epoch = bank.bank_fields.epoch_schedule.getSlotsInEpoch(epoch); + const staked_nodes = try epoch_stakes.stakes.vote_accounts.stakedNodes(allocator); + + const slot_leaders = try leaderSchedule(allocator, staked_nodes, slots_in_epoch, epoch); + + _, const slot_index = bank.bank_fields.epoch_schedule.getEpochAndSlotIndex(bank.bank_fields.slot); + const epoch_start_slot = bank.bank_fields.slot - slot_index; + return SingleEpochLeaderSchedule{ + .allocator = allocator, + .slot_leaders = slot_leaders, + .start_slot = epoch_start_slot, + }; +} + +pub const StakedNode = struct { id: Pubkey, stake: u64 }; + +pub fn leaderSchedule( + allocator: Allocator, + staked_nodes: *const std.AutoHashMap(Pubkey, u64), + slots_in_epoch: Slot, + epoch: Epoch, +) Allocator.Error![]Pubkey { + const Entry = std.AutoHashMap(Pubkey, u64).Entry; + + const nodes = try allocator.alloc(Entry, staked_nodes.count()); + defer allocator.free(nodes); + var iter = staked_nodes.iterator(); + var index: usize = 0; + while (iter.next()) |staked_node_entry| : (index += 1) { + nodes[index] = staked_node_entry; + } + std.mem.sortUnstable(Entry, nodes, {}, struct { + fn gt(_: void, lhs: Entry, rhs: Entry) bool { + return switch (std.math.order(lhs.value_ptr.*, rhs.value_ptr.*)) { + .gt => true, + .lt => false, + .eq => .gt == std.mem.order(u8, &lhs.key_ptr.data, &rhs.key_ptr.data), + }; + } + }.gt); + + // init random number generator + var seed: [32]u8 = .{0} ** 32; + std.mem.writeInt(Epoch, seed[0..@sizeOf(Epoch)], epoch, .little); + var rng = ChaChaRng(20).fromSeed(seed); + const random = rng.random(); + + // init sampler from stake weights + const stakes = try allocator.alloc(u64, nodes.len); + defer allocator.free(stakes); + for (nodes, 0..) |entry, i| stakes[i] = entry.value_ptr.*; + var sampler = try WeightedRandomSampler(u64).init(allocator, random, stakes); + defer sampler.deinit(); + + // calculate leader schedule + const slot_leaders = try allocator.alloc(Pubkey, slots_in_epoch); + var current_node: Pubkey = undefined; + for (0..slots_in_epoch) |i| { + if (i % NUM_CONSECUTIVE_LEADER_SLOTS == 0) { + current_node = nodes[sampler.sample()].key_ptr.*; + } + slot_leaders[i] = current_node; + } + + return slot_leaders; +} + +pub fn writeLeaderSchedule(sched: SingleEpochLeaderSchedule, writer: anytype) !void { + for (sched.slot_leaders, 0..) |leader, i| { + try writer.print(" {} {s}\n", .{ i + sched.start_slot, &leader.string() }); + } +} + +/// Parses the leader schedule as formatted by the `solana leader-schedule` and +/// `sig leader-schedule` commands. +pub fn parseLeaderSchedule( + allocator: std.mem.Allocator, + reader: anytype, +) !SingleEpochLeaderSchedule { + var slot_leaders = std.ArrayList(Pubkey).init(allocator); + var start_slot: Slot = 0; + var expect: ?Slot = null; + var row: [256]u8 = undefined; + while (true) { + const line = reader.readUntilDelimiter(&row, '\n') catch |e| switch (e) { + error.EndOfStream => break, + else => return e, + }; + var word_iter = std.mem.split(u8, line, " "); + const slot = try std.fmt.parseInt(Slot, nextNonEmpty(&word_iter) orelse continue, 10); + if (expect) |*exp_slot| { + if (slot != exp_slot.*) { + return error.Discontinuity; + } + exp_slot.* += 1; + } else { + expect = slot + 1; + start_slot = slot; + } + const node_str = nextNonEmpty(&word_iter) orelse return error.MissingPubkey; + try slot_leaders.append(try Pubkey.fromString(node_str)); + } + return .{ + .allocator = allocator, + .slot_leaders = try allocator.realloc( + slot_leaders.items.ptr[0..slot_leaders.capacity], + slot_leaders.items.len, + ), + .start_slot = start_slot, + }; +} + +fn nextNonEmpty(word_iter: anytype) ?[]const u8 { + while (word_iter.next()) |word| if (word.len > 0) return word; + return null; +} + +test "leaderSchedule calculation matches agave" { + var rng = ChaChaRng(20).fromSeed(.{0} ** 32); + const random = rng.random(); + var pubkey_bytes: [32]u8 = undefined; + var staked_nodes = std.AutoHashMap(Pubkey, u64).init(std.testing.allocator); + defer staked_nodes.deinit(); + for (0..100) |_| { + random.bytes(&pubkey_bytes); + const key = Pubkey{ .data = pubkey_bytes }; + const stake = random.int(u64) / 1000; + try staked_nodes.put(key, stake); + } + const slot_leaders = try leaderSchedule(std.testing.allocator, &staked_nodes, 321, 123); + defer std.testing.allocator.free(slot_leaders); + for (slot_leaders, 0..) |slot_leader, i| { + try std.testing.expect((try Pubkey.fromString(generated_leader_schedule[i])).equals(&slot_leader)); + } +} + +test "parseLeaderSchedule writeLeaderSchedule happy path roundtrip" { + const allocator = std.testing.allocator; + const input_file = + \\ 270864000 Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk + \\ 270864001 Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk + \\ 270864002 Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk + \\ 270864003 Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk + \\ 270864004 GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8 + \\ 270864005 GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8 + \\ 270864006 GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8 + \\ 270864007 GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8 + \\ 270864008 DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP + \\ 270864009 DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP + \\ 270864010 DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP + \\ + ; + const expected_nodes = [_]Pubkey{ + try Pubkey.fromString("Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk"), + try Pubkey.fromString("Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk"), + try Pubkey.fromString("Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk"), + try Pubkey.fromString("Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk"), + try Pubkey.fromString("GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8"), + try Pubkey.fromString("GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8"), + try Pubkey.fromString("GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8"), + try Pubkey.fromString("GBuP6xK2zcUHbQuUWM4gbBjom46AomsG8JzSp1bzJyn8"), + try Pubkey.fromString("DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP"), + try Pubkey.fromString("DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP"), + try Pubkey.fromString("DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP"), + }; + const expected_start = 270864000; + + // parse input file + var stream = std.io.fixedBufferStream(input_file); + const leader_schedule = try parseLeaderSchedule(allocator, stream.reader()); + defer leader_schedule.deinit(); + try std.testing.expect(expected_start == leader_schedule.start_slot); + try std.testing.expect(expected_nodes.len == leader_schedule.slot_leaders.len); + for (expected_nodes, leader_schedule.slot_leaders) |expected, actual| { + try std.testing.expect(expected.equals(&actual)); + } + + // write file out + var out_buf: [2 * input_file.len]u8 = undefined; + var out_stream = std.io.fixedBufferStream(&out_buf); + try writeLeaderSchedule(leader_schedule, out_stream.writer()); + const out_file = out_stream.getWritten(); + try std.testing.expect(std.mem.eql(u8, out_file, input_file)); +} + +const generated_leader_schedule = [_][]const u8{ + "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", + "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", + "AvsmCG8R1qGJtRvjqudkX974ihfbYZUVf4t515tzxyHv", "AvsmCG8R1qGJtRvjqudkX974ihfbYZUVf4t515tzxyHv", + "AvsmCG8R1qGJtRvjqudkX974ihfbYZUVf4t515tzxyHv", "AvsmCG8R1qGJtRvjqudkX974ihfbYZUVf4t515tzxyHv", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "CrFNvAe9JJkW9yzmtUMWpA4GMvRqWz88EJLPyhrxmFzd", "CrFNvAe9JJkW9yzmtUMWpA4GMvRqWz88EJLPyhrxmFzd", + "CrFNvAe9JJkW9yzmtUMWpA4GMvRqWz88EJLPyhrxmFzd", "CrFNvAe9JJkW9yzmtUMWpA4GMvRqWz88EJLPyhrxmFzd", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", + "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", + "4SNVzDbWzQmUWRVb5BNXifV4NYGoYA8evhpSFfy5pSaN", "4SNVzDbWzQmUWRVb5BNXifV4NYGoYA8evhpSFfy5pSaN", + "4SNVzDbWzQmUWRVb5BNXifV4NYGoYA8evhpSFfy5pSaN", "4SNVzDbWzQmUWRVb5BNXifV4NYGoYA8evhpSFfy5pSaN", + "5nvNxUpHfZ2FSRPbDtDyMeFrvN5YBFBvokoYobe2qgqH", "5nvNxUpHfZ2FSRPbDtDyMeFrvN5YBFBvokoYobe2qgqH", + "5nvNxUpHfZ2FSRPbDtDyMeFrvN5YBFBvokoYobe2qgqH", "5nvNxUpHfZ2FSRPbDtDyMeFrvN5YBFBvokoYobe2qgqH", + "8zScg5nWKZEzFJnhPu5s9zBeLRHTTjvfcw2aKWDRJNDt", "8zScg5nWKZEzFJnhPu5s9zBeLRHTTjvfcw2aKWDRJNDt", + "8zScg5nWKZEzFJnhPu5s9zBeLRHTTjvfcw2aKWDRJNDt", "8zScg5nWKZEzFJnhPu5s9zBeLRHTTjvfcw2aKWDRJNDt", + "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", + "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "8LetEQHxeoVTMFoQiett6odLZTwKw1SLYnC6UiNHaNC9", "8LetEQHxeoVTMFoQiett6odLZTwKw1SLYnC6UiNHaNC9", + "8LetEQHxeoVTMFoQiett6odLZTwKw1SLYnC6UiNHaNC9", "8LetEQHxeoVTMFoQiett6odLZTwKw1SLYnC6UiNHaNC9", + "Gh8qe5sKntpd7RMhLSy52CZcwEZZVrPujNxFc6FCsXSC", "Gh8qe5sKntpd7RMhLSy52CZcwEZZVrPujNxFc6FCsXSC", + "Gh8qe5sKntpd7RMhLSy52CZcwEZZVrPujNxFc6FCsXSC", "Gh8qe5sKntpd7RMhLSy52CZcwEZZVrPujNxFc6FCsXSC", + "F8RUGg4CfVvsHGH38aAR3s2nsw7Faw2QjhsWomScgEtb", "F8RUGg4CfVvsHGH38aAR3s2nsw7Faw2QjhsWomScgEtb", + "F8RUGg4CfVvsHGH38aAR3s2nsw7Faw2QjhsWomScgEtb", "F8RUGg4CfVvsHGH38aAR3s2nsw7Faw2QjhsWomScgEtb", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", + "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", "ALiW6m6KxrY98DkUhToCU8eLfgrQ73Zuo3eh9phWmbJV", + "GmNETdrkoh2trUWJ4bHP6TLsHw2jcbdwTDyKYToYNTqU", "GmNETdrkoh2trUWJ4bHP6TLsHw2jcbdwTDyKYToYNTqU", + "GmNETdrkoh2trUWJ4bHP6TLsHw2jcbdwTDyKYToYNTqU", "GmNETdrkoh2trUWJ4bHP6TLsHw2jcbdwTDyKYToYNTqU", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "Ar17KaAgMEiGVVRQqo3ta7jfeAksR3JcXaRsiDSBxgz3", "Ar17KaAgMEiGVVRQqo3ta7jfeAksR3JcXaRsiDSBxgz3", + "Ar17KaAgMEiGVVRQqo3ta7jfeAksR3JcXaRsiDSBxgz3", "Ar17KaAgMEiGVVRQqo3ta7jfeAksR3JcXaRsiDSBxgz3", + "EgYg66jU5q678BdPGEPj1fyobsPXzwLoxz2uvvSUQ2zG", "EgYg66jU5q678BdPGEPj1fyobsPXzwLoxz2uvvSUQ2zG", + "EgYg66jU5q678BdPGEPj1fyobsPXzwLoxz2uvvSUQ2zG", "EgYg66jU5q678BdPGEPj1fyobsPXzwLoxz2uvvSUQ2zG", + "EtnzJyeepGFXSJZ7EWqi1kXYi2zpgFMdUtDx1ovRTi75", "EtnzJyeepGFXSJZ7EWqi1kXYi2zpgFMdUtDx1ovRTi75", + "EtnzJyeepGFXSJZ7EWqi1kXYi2zpgFMdUtDx1ovRTi75", "EtnzJyeepGFXSJZ7EWqi1kXYi2zpgFMdUtDx1ovRTi75", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "3dMU1xcDSXzaG9mFB8N6ySKsSE1AknaxPAYBxCs83qsn", "3dMU1xcDSXzaG9mFB8N6ySKsSE1AknaxPAYBxCs83qsn", + "3dMU1xcDSXzaG9mFB8N6ySKsSE1AknaxPAYBxCs83qsn", "3dMU1xcDSXzaG9mFB8N6ySKsSE1AknaxPAYBxCs83qsn", + "36hQwDVUzUBqij3vukdrjGogxjH1qzve66vMLLHgkoNG", "36hQwDVUzUBqij3vukdrjGogxjH1qzve66vMLLHgkoNG", + "36hQwDVUzUBqij3vukdrjGogxjH1qzve66vMLLHgkoNG", "36hQwDVUzUBqij3vukdrjGogxjH1qzve66vMLLHgkoNG", + "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", + "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", + "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", + "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", + "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", + "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", + "4Lu8CGxdYgXAHKUMU3bN4BnDSYdcqniNV4WFJ2GiY5wz", "4Lu8CGxdYgXAHKUMU3bN4BnDSYdcqniNV4WFJ2GiY5wz", + "4Lu8CGxdYgXAHKUMU3bN4BnDSYdcqniNV4WFJ2GiY5wz", "4Lu8CGxdYgXAHKUMU3bN4BnDSYdcqniNV4WFJ2GiY5wz", + "9BF6Dt4ELaWvZ88sdKkwx6LPvo51w7A3FG5dqTFBnNC6", "9BF6Dt4ELaWvZ88sdKkwx6LPvo51w7A3FG5dqTFBnNC6", + "9BF6Dt4ELaWvZ88sdKkwx6LPvo51w7A3FG5dqTFBnNC6", "9BF6Dt4ELaWvZ88sdKkwx6LPvo51w7A3FG5dqTFBnNC6", + "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", + "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", "8GfdNsue2yP6dagvMwK9YKWKhxteELz1JGeMdy7b3Xtp", + "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", + "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", "ChZKtGvACLPovxKJLUtnDyNHiPLECoXsneziARENU8kV", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", + "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", + "4XVPmBXM6bfJdUqkLfAxS6t4fsZS9D3rSZd3M835u91H", "4XVPmBXM6bfJdUqkLfAxS6t4fsZS9D3rSZd3M835u91H", + "4XVPmBXM6bfJdUqkLfAxS6t4fsZS9D3rSZd3M835u91H", "4XVPmBXM6bfJdUqkLfAxS6t4fsZS9D3rSZd3M835u91H", + "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", + "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", "HzAQzrCnH7VAaxAVSahEs6WcBcW38bi3ZLarZaZ1YVR4", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "J8pKv47cms17Qav9s97pJjKhQLRvmQbxGMLKVe7QXF7P", "J8pKv47cms17Qav9s97pJjKhQLRvmQbxGMLKVe7QXF7P", + "J8pKv47cms17Qav9s97pJjKhQLRvmQbxGMLKVe7QXF7P", "J8pKv47cms17Qav9s97pJjKhQLRvmQbxGMLKVe7QXF7P", + "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", + "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", "A5TCWz8baPdYYgeCa5scXwNEUmnsYwbWbmGLbUSYergs", + "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", + "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "DWUt7KxRWF8GdFhTYdM4ZFA3rQ9roKrRXUcm8Xeeywkq", "DWUt7KxRWF8GdFhTYdM4ZFA3rQ9roKrRXUcm8Xeeywkq", + "DWUt7KxRWF8GdFhTYdM4ZFA3rQ9roKrRXUcm8Xeeywkq", "DWUt7KxRWF8GdFhTYdM4ZFA3rQ9roKrRXUcm8Xeeywkq", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "Esqc3WZPLR1XkvapZxRqFTQa6UxGQM4Lamqt6duLFEtj", "Esqc3WZPLR1XkvapZxRqFTQa6UxGQM4Lamqt6duLFEtj", + "Esqc3WZPLR1XkvapZxRqFTQa6UxGQM4Lamqt6duLFEtj", "Esqc3WZPLR1XkvapZxRqFTQa6UxGQM4Lamqt6duLFEtj", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", "8NaFkAtLW8qo4VdgUuU1VAD3nuKZxpDFMGpeg1ajeCSZ", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", "5W6GpY2dKVsks2QF1EdrDSasdM1f9KqVNVFbZTzBy8V", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", "Ay73RcvjzYq43dTv32CzTEhddBBQJL6J5JnzbJjTFQZN", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", "DCKCfRPPfHUHmJz7ejnLCQodkhsuKj51exdXUBWEivQB", + "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", + "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", "ErAWjNHKa2oChJcqdyoCXC5ZZsLqpGwphcbmMxTEwmsZ", + "9CsaB86comVhyFqtDrALpBHhaBHGmf13iBJL7JDWV9p2", "9CsaB86comVhyFqtDrALpBHhaBHGmf13iBJL7JDWV9p2", + "9CsaB86comVhyFqtDrALpBHhaBHGmf13iBJL7JDWV9p2", "9CsaB86comVhyFqtDrALpBHhaBHGmf13iBJL7JDWV9p2", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", + "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", + "6b4LnC2vhdfS5MqjBSwWMFdJwA9tgbu2ezEcspeKVYSn", "6b4LnC2vhdfS5MqjBSwWMFdJwA9tgbu2ezEcspeKVYSn", + "6b4LnC2vhdfS5MqjBSwWMFdJwA9tgbu2ezEcspeKVYSn", "6b4LnC2vhdfS5MqjBSwWMFdJwA9tgbu2ezEcspeKVYSn", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "8gxPGDZK4G8qzW7zsRpw8MW84rRpeUS6vj8CGrPbYdyk", "8gxPGDZK4G8qzW7zsRpw8MW84rRpeUS6vj8CGrPbYdyk", + "8gxPGDZK4G8qzW7zsRpw8MW84rRpeUS6vj8CGrPbYdyk", "8gxPGDZK4G8qzW7zsRpw8MW84rRpeUS6vj8CGrPbYdyk", + "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", + "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "GwckxXocVzxE8Ao1nWrW6QmBCLiy3k5DuLE2uEV2RHAq", "GwckxXocVzxE8Ao1nWrW6QmBCLiy3k5DuLE2uEV2RHAq", + "GwckxXocVzxE8Ao1nWrW6QmBCLiy3k5DuLE2uEV2RHAq", "GwckxXocVzxE8Ao1nWrW6QmBCLiy3k5DuLE2uEV2RHAq", + "Gkkp1TrPTWZLRE88HNZvRfUh2Hjpgti9g7AqQv88cf9", "Gkkp1TrPTWZLRE88HNZvRfUh2Hjpgti9g7AqQv88cf9", + "Gkkp1TrPTWZLRE88HNZvRfUh2Hjpgti9g7AqQv88cf9", "Gkkp1TrPTWZLRE88HNZvRfUh2Hjpgti9g7AqQv88cf9", + "HCbPW8qzM3feTpyYrA1HS5byK7PqBq4m1cvZRuQT6yb1", "HCbPW8qzM3feTpyYrA1HS5byK7PqBq4m1cvZRuQT6yb1", + "HCbPW8qzM3feTpyYrA1HS5byK7PqBq4m1cvZRuQT6yb1", "HCbPW8qzM3feTpyYrA1HS5byK7PqBq4m1cvZRuQT6yb1", + "3r3Tzsck7WPJbsbPM9yhC8fsBQtjFFBwrGX2ct394Bef", "3r3Tzsck7WPJbsbPM9yhC8fsBQtjFFBwrGX2ct394Bef", + "3r3Tzsck7WPJbsbPM9yhC8fsBQtjFFBwrGX2ct394Bef", "3r3Tzsck7WPJbsbPM9yhC8fsBQtjFFBwrGX2ct394Bef", + "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", + "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", "EiLPxcwYe8akU9g6C6A99j5mep3N7A4ySfMNunC3qMjQ", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", "DxMVzBzTuX2VprSQEvtKraPR5JVMSgx4rqyASG4xEVNW", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", + "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", "FLG8C3rziE56N3jib3NPB1TcTrGJXeTmLLwvPCW337PA", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", "6BSbQZxg86LrAtscs3cezszNJmJhRWErG72VWavECEz6", + "9qBV9MtqqSSt4pt8XvX8URn2fLQqNPWzcYiBx8rcAgiX", "9qBV9MtqqSSt4pt8XvX8URn2fLQqNPWzcYiBx8rcAgiX", + "9qBV9MtqqSSt4pt8XvX8URn2fLQqNPWzcYiBx8rcAgiX", "9qBV9MtqqSSt4pt8XvX8URn2fLQqNPWzcYiBx8rcAgiX", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", "78DHaJZHsmTj5g6xPQJa5pP99Tg6MgG8zQVqcKG5Zq7x", + "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", + "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", "Hq6Tke5EnrpDADM4sTcfMZoSLwsNCNz4pmHpCoc4UdY9", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", "GGZKvA54JkUQ66NqkLAeo7uqu9dJdneXm9gsPqTbNEMY", + "EffB1PCz4fwqLGD9ko1bkRTFVHekedCTX83az91Rdbo2", "EffB1PCz4fwqLGD9ko1bkRTFVHekedCTX83az91Rdbo2", + "EffB1PCz4fwqLGD9ko1bkRTFVHekedCTX83az91Rdbo2", "EffB1PCz4fwqLGD9ko1bkRTFVHekedCTX83az91Rdbo2", + "AuDjtyKmix6vLHBsfouA82GQrmJ4JRWRPqCEcD54kkkH", "AuDjtyKmix6vLHBsfouA82GQrmJ4JRWRPqCEcD54kkkH", + "AuDjtyKmix6vLHBsfouA82GQrmJ4JRWRPqCEcD54kkkH", "AuDjtyKmix6vLHBsfouA82GQrmJ4JRWRPqCEcD54kkkH", + "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", + "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", "HU1g6zZ3LrXJeFYEmDAekv44kAv9XE8g8FQYz3rSrmNY", + "DUUJtWATcHjMhNHGh9pPud3HGp4yrrZe1tE7ELHXUAB6", +}; diff --git a/src/core/lib.zig b/src/core/lib.zig index c2d078d21..b3f652487 100644 --- a/src/core/lib.zig +++ b/src/core/lib.zig @@ -1,6 +1,7 @@ pub const account = @import("account.zig"); pub const hard_forks = @import("hard_forks.zig"); pub const hash = @import("hash.zig"); +pub const leader_schedule = @import("leader_schedule.zig"); pub const pubkey = @import("pubkey.zig"); pub const shred = @import("shred.zig"); pub const signature = @import("signature.zig"); diff --git a/src/core/pubkey.zig b/src/core/pubkey.zig index 5610d6992..48cd9bf64 100644 --- a/src/core/pubkey.zig +++ b/src/core/pubkey.zig @@ -16,7 +16,7 @@ pub const Pubkey = extern struct { pub fn fromString(str: []const u8) !Self { var out: [32]u8 = undefined; const written = decoder.decode(str, &out) catch return Error.InvalidEncodedValue; - if (written != 32) @panic("written is not 32"); + if (written != 32) return Error.InvalidBytesLength; return Self{ .data = out }; } diff --git a/src/gossip/service.zig b/src/gossip/service.zig index e67d0d2bc..77984dab2 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -48,6 +48,7 @@ const Ping = sig.gossip.ping_pong.Ping; const Pong = sig.gossip.ping_pong.Pong; const PingCache = sig.gossip.ping_pong.PingCache; const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr; +const ServiceManager = sig.utils.service_manager.ServiceManager; const endpointToString = sig.net.endpointToString; const globalRegistry = sig.prometheus.globalRegistry; @@ -272,36 +273,20 @@ pub const GossipService = struct { deinitMux(&self.failed_pull_hashes_mux); } - pub const RunHandles = struct { - exit: *AtomicBool, - receiver_thread: std.Thread, - packet_verifier_thread: std.Thread, - message_processor_thread: std.Thread, - message_builder_thread: ?std.Thread, - responder_thread: std.Thread, - dumper_thread: ?std.Thread, - - /// If any of the threads join, all other threads will be signalled to join. - pub fn joinAndExit(handles: RunHandles) void { - inline for (@typeInfo(RunHandles).Struct.fields, 0..) |field, i| cont: { - comptime if (@field(std.meta.FieldEnum(RunHandles), field.name) == .exit) { - std.debug.assert(field.type == *AtomicBool); - continue; - }; - const maybe_thread: ?std.Thread = @field(handles, field.name); - const thread = maybe_thread orelse break :cont; - thread.join(); // if we end up joining, something's gone wrong, so signal exit - if (i == 0) handles.exit.store(true, .unordered); - } - } - }; - pub const RunThreadsParams = struct { spy_node: bool, dump: bool, }; - /// spawns required threads for the gossip serivce. + /// starts gossip and blocks until it exits + pub fn run(self: *Self, params: RunThreadsParams) !void { + var manager = ServiceManager.init(self.allocator, self.logger, self.exit, "gossip", .{}, .{}); + try self.start(params, &manager); + manager.join(); + manager.deinit(); + } + + /// spawns required threads for the gossip service and returns immediately /// including: /// 1) socket reciever /// 2) packet verifier @@ -309,78 +294,42 @@ pub const GossipService = struct { /// 4) build message loop (to send outgoing message) (only active if not a spy node) /// 5) a socket responder (to send outgoing packets) /// 6) echo server - pub fn runThreads( + pub fn start( self: *Self, params: RunThreadsParams, - ) std.Thread.SpawnError!RunHandles { - const spy_node = params.spy_node; - const dump = params.dump; - + manager: *ServiceManager, + ) (std.mem.Allocator.Error || std.Thread.SpawnError)!void { // TODO(Ahmad): need new server impl, for now we don't join server thread // because http.zig's server doesn't stop when you call server.stop() - it's broken // const echo_server_thread = try self.echo_server.listenAndServe(); // _ = echo_server_thread; + errdefer manager.deinit(); - const exitAndJoin = struct { - inline fn exitAndJoin(exit: *AtomicBool, thread: std.Thread) void { - exit.store(true, .unordered); - thread.join(); - } - }.exitAndJoin; - - const receiver_thread = try Thread.spawn(.{}, socket_utils.readSocket, .{ + try manager.spawn("gossip readSocket", socket_utils.readSocket, .{ self.allocator, self.gossip_socket, self.packet_incoming_channel, self.exit, self.logger, }); - errdefer exitAndJoin(self.exit, receiver_thread); + try manager.spawn("gossip verifyPackets", verifyPackets, .{self}); + try manager.spawn("gossip processMessages", processMessages, .{ self, self.gossip_value_allocator }); - const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{self}); - errdefer exitAndJoin(self.exit, packet_verifier_thread); + if (!params.spy_node) try manager.spawn("gossip buildMessages", buildMessages, .{self}); - const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, self.gossip_value_allocator }); - errdefer exitAndJoin(self.exit, message_processor_thread); - - const maybe_message_builder_thread: ?std.Thread = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{self}) else null; - errdefer if (maybe_message_builder_thread) |thread| { - exitAndJoin(self.exit, thread); - }; - - const responder_thread = try Thread.spawn(.{}, socket_utils.sendSocket, .{ + try manager.spawn("gossip sendSocket", socket_utils.sendSocket, .{ self.gossip_socket, self.packet_outgoing_channel, self.exit, self.logger, }); - errdefer exitAndJoin(self.exit, responder_thread); - const maybe_dumper_thread: ?std.Thread = if (dump) try Thread.spawn(.{}, GossipDumpService.run, .{.{ + if (params.dump) try manager.spawn("GossipDumpService", GossipDumpService.run, .{.{ .allocator = self.allocator, .logger = self.logger, .gossip_table_rw = &self.gossip_table_rw, .exit = self.exit, - }}) else null; - errdefer if (maybe_dumper_thread) |thread| { - exitAndJoin(self.exit, thread); - }; - - return .{ - .exit = self.exit, - - .receiver_thread = receiver_thread, - .packet_verifier_thread = packet_verifier_thread, - .message_processor_thread = message_processor_thread, - .message_builder_thread = maybe_message_builder_thread, - .responder_thread = responder_thread, - .dumper_thread = maybe_dumper_thread, - }; - } - - pub fn run(self: *Self, params: RunThreadsParams) !void { - const run_handles = try self.runThreads(params); - defer run_handles.joinAndExit(); + }}); } const VerifyMessageTask = ThreadPoolTask(VerifyMessageEntry); diff --git a/src/lib.zig b/src/lib.zig index bbd25ff86..93ec05e3c 100644 --- a/src/lib.zig +++ b/src/lib.zig @@ -6,6 +6,7 @@ pub const crypto = @import("crypto/fnv.zig"); pub const gossip = @import("gossip/lib.zig"); pub const net = @import("net/lib.zig"); pub const prometheus = @import("prometheus/lib.zig"); +pub const rand = @import("rand/rand.zig"); pub const rpc = @import("rpc/lib.zig"); pub const shred_collector = @import("shred_collector/lib.zig"); pub const sync = @import("sync/lib.zig"); diff --git a/src/rand/chacha.zig b/src/rand/chacha.zig new file mode 100644 index 000000000..698249036 --- /dev/null +++ b/src/rand/chacha.zig @@ -0,0 +1,524 @@ +//! Port of ChaCha from the `rand_chacha` crate. +//! +//! Generates the same psuedorandom numbers as rand_chacha, unlike Zig std's +//! ChaCha. +//! +//! This is needed since rand_chacha differs from the zig std's ChaCha in several +//! ways. One example is that it does not comply with the IETF standard, plus +//! there are other compatibility issues that require a different design from zig +//! std, like how it maintains state across iterations. + +const std = @import("std"); +const builtin = @import("builtin"); +const sig = @import("../lib.zig"); + +const mem = std.mem; + +const BlockRng = sig.rand.BlockRng; + +const endian = builtin.cpu.arch.endian(); + +/// A random number generator based on ChaCha. +/// Generates the same stream as ChaChaRng in `rand_chacha`. +/// This is an ease-of-use wrapper for the type: +/// BlockRng(ChaCha(rounds), ChaCha(rounds).generate) +pub fn ChaChaRng(comptime rounds: usize) type { + return struct { + block_rng: BlockRng(ChaCha(rounds), ChaCha(rounds).generate), + + const Self = @This(); + + pub fn fromSeed(seed: [32]u8) Self { + return .{ .block_rng = .{ .core = ChaCha(rounds).init(seed, .{0} ** 12) } }; + } + + pub fn random(self: *Self) std.rand.Random { + return self.block_rng.random(); + } + }; +} + +/// Computes the chacha stream. +/// +/// This is the barebones implementation of the chacha stream cipher. If you're +/// looking for a random number generator based on the chacha stream cipher, use +/// ChaChaRng. +pub fn ChaCha(comptime rounds: usize) type { + return struct { + b: [4]u32, + c: [4]u32, + d: [4]u32, + + const Self = @This(); + + pub fn init(key: [32]u8, nonce: [12]u8) Self { + const ctr_nonce = .{0} ++ leIntBitCast([3]u32, nonce); + return .{ + .b = leIntBitCast([4]u32, key[0..16].*), + .c = leIntBitCast([4]u32, key[16..].*), + .d = ctr_nonce, + }; + } + + /// Run the full chacha algorithm, generating the next block of 64 32-bit + /// integers. + pub fn generate(self: *Self, out: *[64]u32) void { + const k = comptime leIntBitCast([4]u32, @as([16]u8, "expand 32-byte k".*)); + const b = self.b; + const c = self.c; + var x = State{ + .a = .{ k, k, k, k }, + .b = .{ b, b, b, b }, + .c = .{ c, c, c, c }, + .d = repeat4timesAndAdd0123(self.d), + }; + for (0..rounds / 2) |_| { + x = diagonalize(round(diagonalize(round(x), 1)), -1); + } + const sb = self.b; + const sc = self.c; + const sd = repeat4timesAndAdd0123(self.d); + const results: [64]u32 = @bitCast(transpose4(.{ + wrappingAddEachInt(x.a, .{ k, k, k, k }), + wrappingAddEachInt(x.b, .{ sb, sb, sb, sb }), + wrappingAddEachInt(x.c, .{ sc, sc, sc, sc }), + wrappingAddEachInt(x.d, sd), + })); + @memcpy(out[0..64], &results); + self.d = wrappingAddToFirstHalf(sd[0], 4); + } + }; +} + +const State = struct { + a: [4][4]u32, + b: [4][4]u32, + c: [4][4]u32, + d: [4][4]u32, + + const Self = @This(); + + fn eql(self: *const Self, other: *const Self) bool { + inline for (.{ "a", "b", "c", "d" }) |field_name| { + const lhs = @field(self, field_name); + const rhs = @field(other, field_name); + for (0..4) |i| { + if (!mem.eql(u32, &lhs[i], &rhs[i])) return false; + } + } + return true; + } +}; + +fn transpose4(a: [4][4][4]u32) [4][4][4]u32 { + return .{ + .{ a[0][0], a[1][0], a[2][0], a[3][0] }, + .{ a[0][1], a[1][1], a[2][1], a[3][1] }, + .{ a[0][2], a[1][2], a[2][2], a[3][2] }, + .{ a[0][3], a[1][3], a[2][3], a[3][3] }, + }; +} + +/// converts the first two items into a u64 and then wrapping_adds the integer +/// `i` to it, then converts back to u32s. +fn wrappingAddToFirstHalf(d: [4]u32, i: u64) [4]u32 { + var u64s = leIntBitCast([2]u64, d); + u64s[0] += i; + return leIntBitCast([4]u32, u64s); +} + +fn repeat4timesAndAdd0123(d: [4]u32) [4][4]u32 { + return .{ + wrappingAddToFirstHalf(d, 0), + wrappingAddToFirstHalf(d, 1), + wrappingAddToFirstHalf(d, 2), + wrappingAddToFirstHalf(d, 3), + }; +} + +/// Run a single round of the ChaCha algorithm +fn round(state: State) State { + var x = state; + x.a = wrappingAddEachInt(x.a, x.b); + x.d = xorThenRotateRight(x.d, x.a, 16); + x.c = wrappingAddEachInt(x.c, x.d); + x.b = xorThenRotateRight(x.b, x.c, 20); + x.a = wrappingAddEachInt(x.a, x.b); + x.d = xorThenRotateRight(x.d, x.a, 24); + x.c = wrappingAddEachInt(x.c, x.d); + x.b = xorThenRotateRight(x.b, x.c, 25); + return x; +} + +fn wrappingAddEachInt(a: [4][4]u32, b: [4][4]u32) [4][4]u32 { + var sum: [4][4]u32 = undefined; + for (0..4) |i| for (0..4) |j| { + sum[i][j] = a[i][j] +% b[i][j]; + }; + return sum; +} + +fn xorThenRotateRight(const_lhs: [4][4]u32, rhs: [4][4]u32, rotate: anytype) [4][4]u32 { + var lhs = const_lhs; + for (0..4) |i| for (0..4) |j| { + const xor = lhs[i][j] ^ rhs[i][j]; + lhs[i][j] = std.math.rotr(u32, xor, rotate); + }; + return lhs; +} + +/// Reinterprets an integer or array of integers as an integer or array of +/// integers with different sizes. For example, can convert u64 -> [2]u32 or vice +/// versa. +/// +/// The function ensures that the resulting numbers are universal across +/// platforms, using little-endian ordering. +/// +/// So, this is the same as @bitCast for little endian platforms, but it requires +/// a byte swap for big endian platforms. +fn leIntBitCast(comptime Output: type, input: anytype) Output { + switch (endian) { + .little => return @bitCast(input), + .big => { + if (numItems(Output) > numItems(@TypeOf(input))) { + var in = input; + for (&in) |*n| n.* = @byteSwap(n); + return @bitCast(in); + } else { + var out: Output = @bitCast(input); + for (&out) |*n| n.* = @byteSwap(n); + return out; + } + }, + } +} + +/// len of array, or 1 if not array. +fn numItems(comptime T: type) usize { + return switch (@typeInfo(T)) { + .Array => |a| a.len, + else => 1, + }; +} + +fn diagonalize(x: State, times: isize) State { + var out: State = x; + for (0..4) |i| { + out.b[i] = rotateLeft(x.b[i], 1 * times); + out.c[i] = rotateLeft(x.c[i], 2 * times); + out.d[i] = rotateLeft(x.d[i], 3 * times); + } + return out; +} + +/// Rotates array items to different locations in the array. +fn rotateLeft(item: [4]u32, n: isize) [4]u32 { + return .{ + item[mod(n, 4)], + item[mod((n + 1), 4)], + item[mod((n + 2), 4)], + item[mod((n + 3), 4)], + }; +} + +fn mod(n: isize, len: usize) usize { + return @intCast(std.math.mod(isize, n, @intCast(len)) catch unreachable); +} + +test "Random.int(u32) works" { + const chacha = ChaCha(20).init(.{0} ** 32, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + const random = rng.random(); + try std.testing.expect(2917185654 == random.int(u32)); +} + +test "Random.int(u64) works" { + const chacha = ChaCha(20).init(.{0} ** 32, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + const random = rng.random(); + try std.testing.expect(10393729187455219830 == random.int(u64)); +} + +test "Random.bytes works" { + const chacha = ChaCha(20).init(.{0} ** 32, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + const random = rng.random(); + var dest: [32]u8 = undefined; + const midpoint = .{ + 118, 184, 224, 173, 160, 241, 61, 144, 64, 93, 106, 229, 83, 134, 189, 40, 189, 210, + 25, 184, 160, 141, 237, 26, 168, 54, 239, 204, 139, 119, 13, 199, + }; + random.bytes(&dest); + try std.testing.expect(mem.eql(u8, &midpoint, &dest)); +} + +test "recursive fill" { + var bytes: [32]u8 = .{0} ** 32; + var rng_init = ChaChaRng(20).fromSeed(bytes); + rng_init.random().bytes(&bytes); + const chacha = ChaCha(20).init(bytes, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + rng.fill(&bytes); + + const expected = .{ + 176, 253, 20, 255, 150, 160, 189, 161, 84, 195, 41, 8, 44, 156, 101, 51, 187, + 76, 148, 115, 191, 93, 222, 19, 143, 130, 201, 172, 85, 83, 217, 88, + }; + try std.testing.expect(mem.eql(u8, &expected, &bytes)); +} + +test "dynamic next int works" { + var bytes: [32]u8 = .{0} ** 32; + var rng_init = ChaChaRng(20).fromSeed(bytes); + rng_init.random().bytes(&bytes); + const chacha = ChaCha(20).init(bytes, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + const u32s = [_]u32{ + 4279565744, 862297132, 2898887311, 3678189893, 3874939098, 1553983382, 1031206440, + 978567423, 4209765794, 2063739027, 3497840189, 3042885724, 13559713, 2804739726, + 83427940, 1888646802, 2860787473, 1877744140, 3871387528, 2786522908, 315930854, + 120980593, 3002074910, 3285478202, 1586689760, 2340124627, 52115417, 2748045760, + 3357889967, 214072547, 1511164383, 1921839307, 842278728, 1023471299, 3744819639, + 4085269185, 3222055698, 1508829632, 3587328034, 451202787, 3647660313, 3102981063, + 3964799389, 3904121230, 2805919233, 2118987761, 3557954211, 3320127761, 2756534424, + 992375503, 3545628137, 1085584675, 1209223666, 2255867162, 1635202960, 2496462192, + 713473244, 1792112125, 3844522849, 2490299132, 4072683334, 70142460, 2095023485, + 461018663, 3859958840, 212748047, 2657724434, 81297974, 3942098154, 958741438, + 346419548, 2225828352, 2900251414, 336469631, 654063680, 1812174127, 609007208, + 846863059, 3189927372, 1905581022, 2172277675, 4037927613, 3495064163, 3874649746, + 3559563381, 590810202, 2664210773, 3223769241, 2040745611, 360514407, 2919944502, + 536370302, 1065703962, 7253915, 337567527, 1460887337, 1474807598, 1848190485, + 4096711861, 3404804800, + }; + const u64s = [_]u64{ + 588215994606788758, 1431684808409631931, 2931207493295625045, 3032891578644758194, + 418541299932196982, 15396241028896397909, 12835735727689995230, 9873538067654336105, + 12514793613430075092, 13232023512861211064, 16028062863378687135, 16967702477157236558, + 2887555945435226072, 17400462721248040447, 17117735807058458868, 15659305100245141846, + 2699089690138758189, 10755240647284155175, 1924095871294250220, 17515951820211362774, + 13373595865079936501, 6860971170011447095, 14703706923713349358, 11533069247885725721, + 3448216242831738015, 9278269248351279695, 9372255405263360037, 8707387524018776887, + 8746597495079144985, 7691371180483864508, 7537003725416187104, 1981019672903425841, + 10056652572362307735, 2436364459124478962, 2428925607328334081, 14712031039183662158, + 2614237173466617322, 4257610326057511672, 3540403114074859660, 6581767110215406295, + 15150451542146080734, 181278900145439701, 11760969932321600702, 17522913230875340068, + 10318893824576666810, 18312828410504980228, 2805875854392392082, 5355795946829941939, + 7515894275194643237, 9702265981800844421, 227603388627345368, 3324436570698804108, + 4753191896749056049, 17885086500265945805, 17435295308389799126, 5786986546027884036, + 17350667365223054483, 1154396925486892856, 5844933381342596954, 9570272635503767656, + 16336838788699700779, 2336639497643599348, 9795949699684750554, 6329973578295938791, + 15992525826554723486, 17793526484350803500, 13898491381782030824, 4397579918151967336, + 17917727240936500825, 7352683368508344350, 11766507471434633205, 9634720798753459106, + 16282012887761187213, 16324707443307008843, 14425283330535396682, 13172406095143567691, + 2691725161073047006, 1406030345077942778, 9684222056303881176, 9746143945091321583, + 8181709559804695063, 1654050647849141241, 18149780750595962095, 8493844361058276091, + 9446739672321797014, 12390809841934868939, 15188448811864282367, 98895932768533343, + 5024754166561341894, 9730267002865676284, 11893802928445802006, 18309480227270911117, + 17066717792185926269, 13499718013438346758, 5217404074882333630, 12694155839474838416, + 3008502677940577076, 11542601400063272771, 1730084963375478886, 1114921244491478328, + }; + var random = rng.random(); + for (0..100) |i| { + try std.testing.expect(u32s[i] == random.int(u32)); + try std.testing.expect(u64s[i] == random.int(u64)); + } +} + +test "rng works" { + const chacha = ChaCha(20).init(.{0} ** 32, .{0} ** 12); + var rng = BlockRng(ChaCha(20), ChaCha(20).generate){ .core = chacha }; + var dest: [32]u8 = undefined; + const midpoint = .{ + 118, 184, 224, 173, 160, 241, 61, 144, 64, 93, 106, 229, 83, 134, 189, 40, 189, 210, + 25, 184, 160, 141, 237, 26, 168, 54, 239, 204, 139, 119, 13, 199, + }; + rng.fill(&dest); + // assert_eq!(midpoint, dest); + try std.testing.expect(mem.eql(u8, &midpoint, &dest)); +} + +test "ChaCha works" { + var chacha = ChaCha(20){ + .b = .{ 1, 2, 3, 4 }, + .c = .{ 5, 6, 7, 8 }, + .d = .{ 9, 10, 11, 12 }, + }; + var out: [64]u32 = .{0} ** 64; + chacha.generate(&out); + const expected1 = .{ + 514454965, 2343183702, 485828088, 2392727011, 3682321578, 3166467596, 1535089427, + 266038024, 1861812015, 3818141583, 486852448, 277812666, 1961317633, 3870259557, + 3811097870, 10333140, 3471107314, 854767140, 1292362001, 1791493576, 684928595, + 2735203077, 3103536681, 1555264764, 2953779204, 1335099419, 3308039343, 3071159758, + 676902921, 3409736680, 289978712, 198159109, 4106483464, 4193260066, 389599996, + 1248502515, 607568078, 3047265466, 2254027974, 3837112036, 2647654845, 3933149571, + 251366014, 192741632, 4239604811, 2829206891, 2090618058, 86120867, 3489155609, + 162839505, 3738605468, 1369674854, 3501711964, 3507855056, 3021042483, 747171775, + 3095039326, 1302941762, 1534526601, 4269591531, 2416037718, 2139104272, 3631556128, + 4065100274, + }; + try std.testing.expect(mem.eql(u32, &expected1, &out)); +} + +/// for testing +const test_start_state = State{ + .a = .{ .{ 0, 1, 2, 3 }, .{ 4, 5, 6, 7 }, .{ 8, 9, 10, 11 }, .{ 12, 13, 14, 15 } }, + .b = .{ + .{ 16, 17, 18, 19 }, + .{ 20, 21, 22, 23 }, + .{ 24, 25, 26, 27 }, + .{ 28, 29, 30, 31 }, + }, + .c = .{ + .{ 32, 33, 34, 35 }, + .{ 36, 37, 38, 39 }, + .{ 40, 41, 42, 43 }, + .{ 44, 45, 46, 47 }, + }, + .d = .{ + .{ 48, 49, 50, 51 }, + .{ 52, 53, 54, 55 }, + .{ 56, 57, 58, 59 }, + .{ 60, 61, 62, 63 }, + }, +}; + +test "d0123 works" { + const input = .{ 1, 2, 3, 4 }; + const expected_out: [4][4]u32 = .{ + .{ 1, 2, 3, 4 }, + .{ 2, 2, 3, 4 }, + .{ 3, 2, 3, 4 }, + .{ 4, 2, 3, 4 }, + }; + const output = repeat4timesAndAdd0123(input); + for (0..4) |i| { + try std.testing.expect(mem.eql(u32, &expected_out[i], &output[i])); + } +} + +test "diagonalize round trip" { + const mid = diagonalize(test_start_state, 1); + const expected_mid = State{ + .a = .{ + .{ 0, 1, 2, 3 }, + .{ 4, 5, 6, 7 }, + .{ 8, 9, 10, 11 }, + .{ 12, 13, 14, 15 }, + }, + .b = .{ + .{ 17, 18, 19, 16 }, + .{ 21, 22, 23, 20 }, + .{ 25, 26, 27, 24 }, + .{ 29, 30, 31, 28 }, + }, + .c = .{ + .{ 34, 35, 32, 33 }, + .{ 38, 39, 36, 37 }, + .{ 42, 43, 40, 41 }, + .{ 46, 47, 44, 45 }, + }, + .d = .{ + .{ 51, 48, 49, 50 }, + .{ 55, 52, 53, 54 }, + .{ 59, 56, 57, 58 }, + .{ 63, 60, 61, 62 }, + }, + }; + try std.testing.expect(expected_mid.eql(&mid)); + const end = diagonalize(mid, -1); + try std.testing.expect(test_start_state.eql(&end)); +} + +test "round works" { + const expected = State{ + .a = .{ + .{ 196626, 805502996, 1610809366, 1342373912 }, + .{ 3221422106, 4026728476, 2684551198, 2416115744 }, + .{ 2147680289, 2952986659, 3758293029, 3489857575 }, + .{ 1073938473, 1879244843, 537067565, 268632111 }, + }, + .b = .{ + .{ 2441679121, 269101448, 2458599458, 319568059 }, + .{ 2542629751, 370052078, 2492424772, 353393373 }, + .{ 2375079117, 202501204, 2391999998, 252968295 }, + .{ 2341779115, 169201202, 2291574680, 152542977 }, + }, + .c = .{ + .{ 589304352, 539169873, 623253122, 639965299 }, + .{ 791419620, 741285141, 690626246, 707338423 }, + .{ 454566312, 404431833, 488515082, 505227259 }, + .{ 387197292, 337062813, 286403918, 303116095 }, + }, + .d = .{ + .{ 587207168, 536876080, 620762720, 637540432 }, + .{ 788536000, 738204912, 687873696, 704651408 }, + .{ 452993408, 402662320, 486548960, 503326672 }, + .{ 385886528, 335555440, 285224224, 302001936 }, + }, + }; + try std.testing.expect(expected.eql(&round(test_start_state))); +} + +test "bitcast works as vec128" { + const gas = [4]u32{ + std.math.maxInt(u32) / 2, + std.math.maxInt(u32) / 5, + std.math.maxInt(u32) / 7, + std.math.maxInt(u32) / 11, + }; + const liquid: [2]u64 = @bitCast(gas); + const solid: u128 = @bitCast(gas); + const expected_liquid: [2]u64 = .{ 3689348816030400511, 1676976733025356068 }; + const expected_solid = 30934760611684291960695475747055206399; + try std.testing.expect(mem.eql(u64, &expected_liquid, &liquid)); + try std.testing.expect(expected_solid == solid); + try std.testing.expect(mem.eql(u32, &gas, &@as([4]u32, @bitCast(liquid)))); + try std.testing.expect(mem.eql(u32, &gas, &@as([4]u32, @bitCast(solid)))); +} + +test "rotate_right" { + const start = [4]u32{ 16, 17, 18, 19 }; + inline for (.{ + .{ 0, .{ 16, 17, 18, 19 } }, + .{ 1, .{ 8, 2147483656, 9, 2147483657 } }, + .{ 16, .{ 1048576, 1114112, 1179648, 1245184 } }, + .{ 29, .{ 128, 136, 144, 152 } }, + .{ 64, .{ 16, 17, 18, 19 } }, + }) |x| { + const n, const expected = x; + inline for (0..4) |i| { + const start_item = start[i]; + // const right = n % 32; + // const left: u32 = (32 - right) % 32; + // const out = start_item << left | start_item >> @intCast(right); + try std.testing.expect(expected[i] == std.math.rotr(u32, start_item, n)); + } + } +} + +test "add_pos works" { + const input = .{ 1, 2, 3, 4 }; + const i = 1892390; + const output = wrappingAddToFirstHalf(input, i); + try std.testing.expect(mem.eql(u32, &[4]u32{ 1892391, 2, 3, 4 }, &output)); +} + +test "transpose works" { + const input = [4][4][4]u32{ + .{ .{ 0, 1, 2, 3 }, .{ 4, 5, 6, 7 }, .{ 8, 9, 10, 11 }, .{ 12, 13, 14, 15 } }, + .{ .{ 16, 17, 18, 19 }, .{ 20, 21, 22, 23 }, .{ 24, 25, 26, 27 }, .{ 28, 29, 30, 31 } }, + .{ .{ 32, 33, 34, 35 }, .{ 36, 37, 38, 39 }, .{ 40, 41, 42, 43 }, .{ 44, 45, 46, 47 } }, + .{ .{ 48, 49, 50, 51 }, .{ 52, 53, 54, 55 }, .{ 56, 57, 58, 59 }, .{ 60, 61, 62, 63 } }, + }; + const actual = transpose4(input); + const expected: [4][4][4]u32 = .{ + .{ .{ 0, 1, 2, 3 }, .{ 16, 17, 18, 19 }, .{ 32, 33, 34, 35 }, .{ 48, 49, 50, 51 } }, + .{ .{ 4, 5, 6, 7 }, .{ 20, 21, 22, 23 }, .{ 36, 37, 38, 39 }, .{ 52, 53, 54, 55 } }, + .{ .{ 8, 9, 10, 11 }, .{ 24, 25, 26, 27 }, .{ 40, 41, 42, 43 }, .{ 56, 57, 58, 59 } }, + .{ .{ 12, 13, 14, 15 }, .{ 28, 29, 30, 31 }, .{ 44, 45, 46, 47 }, .{ 60, 61, 62, 63 } }, + }; + for (0..4) |i| for (0..4) |j| { + try std.testing.expect(mem.eql(u32, &expected[i][j], &actual[i][j])); + }; +} diff --git a/src/rand/rand.zig b/src/rand/rand.zig new file mode 100644 index 000000000..54ca2673c --- /dev/null +++ b/src/rand/rand.zig @@ -0,0 +1,151 @@ +const std = @import("std"); +const sig = @import("../lib.zig"); +const chacha = @import("chacha.zig"); + +const Allocator = std.mem.Allocator; +const Random = std.Random; + +pub const ChaCha = chacha.ChaCha; +pub const ChaChaRng = chacha.ChaChaRng; + +/// Uniformly samples a collection of weighted items. This struct only deals with +/// the weights, and it tells you which index it selects. +/// +/// This deterministically selects the same sequence of items as WeightedIndex +/// from the rust crate rand_chacha, assuming you use a compatible pseudo-random +/// number generator. +/// +/// Each index's probability of being selected is the ratio of its weight to the +/// sum of all weights. +/// +/// For example, for the weights [1, 3, 2], the probability of `sample` returning +/// each index is: +/// 0. -> 1/6 +/// 1. -> 1/2 +/// 3. -> 1/3 +pub fn WeightedRandomSampler(comptime uint: type) type { + return struct { + allocator: Allocator, + random: Random, + cumulative_weights: []const uint, + total: uint, + + const Self = @This(); + + pub fn init( + allocator: Allocator, + random: Random, + weights: []const uint, + ) Allocator.Error!Self { + var cumulative_weights: []uint = try allocator.alloc(uint, weights.len); + var total: uint = 0; + for (0..weights.len) |i| { + total += weights[i]; + cumulative_weights[i] = total; + } + return .{ + .allocator = allocator, + .random = random, + .cumulative_weights = cumulative_weights, + .total = total, + }; + } + + pub fn deinit(self: Self) void { + self.allocator.free(self.cumulative_weights); + } + + /// Returns the index of the selected item + pub fn sample(self: *const Self) uint { + const want = self.random.uintLessThan(uint, self.total); + var lower: usize = 0; + var upper: usize = self.cumulative_weights.len - 1; + var guess = upper / 2; + for (0..self.cumulative_weights.len) |_| { + if (self.cumulative_weights[guess] >= want) { + upper = guess; + } else { + lower = guess + 1; + } + if (upper == lower) { + return upper; + } + guess = lower + (upper - lower) / 2; + } + unreachable; + } + }; +} + +/// Wrapper for random number generators which generate blocks of [64]u32. +/// Minimizes calls to the underlying random number generator by recycling unused +/// data from previous calls. Port of BlockRng from rust which ensures the same +/// sequence is generated. +pub fn BlockRng( + comptime T: type, + comptime generate: fn (*T, *[64]u32) void, +) type { + return struct { + results: [64]u32 = undefined, + index: usize = 64, + core: T, + + const Self = @This(); + + pub fn random(self: *Self) Random { + return Random.init(self, fill); + } + + pub fn fill(self: *Self, dest: []u8) void { + var completed_bytes: usize = 0; + while (completed_bytes < dest.len) { + if (self.index >= self.results.len) { + generate(&self.core, &self.results); + self.index = 0; + } + const src: [*]u8 = @ptrCast(self.results[self.index..].ptr); + const num_u8s = @min(4 * (64 - self.index), dest.len - completed_bytes); + @memcpy(dest[completed_bytes..][0..num_u8s], src[0..num_u8s]); + + self.index += (num_u8s + 3) / 4; + completed_bytes += num_u8s; + } + } + }; +} + +test "WeightedRandomSampler matches rust with chacha" { + // generate data + var rng = chacha.ChaChaRng(20).fromSeed(.{0} ** 32); + var random = rng.random(); + var items: [100]u64 = undefined; + for (0..100) |i| { + items[i] = @intCast(random.int(u32)); + } + + // run test + const idx = try WeightedRandomSampler(u64).init(std.testing.allocator, random, &items); + defer idx.deinit(); + for (0..100) |i| { + const choice = items[idx.sample()]; + try std.testing.expect(expected_weights[i] == choice); + } +} + +const expected_weights = [_]u64{ + 2956161493, 1129244316, 3088700093, 3781961315, 3373288848, 3202811807, 3373288848, + 3848953152, 2448479257, 3848953152, 772637944, 3781961315, 2813985970, 3612365086, + 1651635039, 2419978656, 1300932346, 3678279626, 683509331, 3612365086, 2086224346, + 3678279626, 3328365435, 3230977993, 2115397425, 3478228973, 2687045579, 3438229160, + 1973446681, 3373288848, 2419978656, 4248444456, 1867348299, 4064846400, 3678279626, + 4064846400, 3373288848, 3373288848, 2240211114, 3678279626, 1300932346, 2254827186, + 3848953152, 1867348299, 1194017814, 2254827186, 3373288848, 1651635039, 3328365435, + 3202811807, 3848953152, 2370328401, 3230977993, 2050511189, 2917185654, 3612365086, + 2576249230, 3438229160, 2866421973, 3438229160, 3612365086, 1669812906, 1768285000, + 877052848, 3755235835, 1651635039, 1931970043, 2813985970, 3781961315, 1004543717, + 2702218887, 2419978656, 2576249230, 2229903491, 4248444456, 3984256562, 4248444456, + 3339548555, 2576249230, 3848953152, 1071654007, 4064846400, 772637944, 4248444456, + 2448479257, 2229903491, 4294454303, 2813985970, 2971532662, 147947182, 2370328401, + 1981921065, 3478228973, 1387042214, 3755235835, 3384151174, 2448479257, 1768285000, + 102030521, 1813932776, +}; diff --git a/src/shred_collector/service.zig b/src/shred_collector/service.zig index d5d6567d2..7c996ebb6 100644 --- a/src/shred_collector/service.zig +++ b/src/shred_collector/service.zig @@ -18,6 +18,7 @@ const Pubkey = sig.core.Pubkey; const RwMux = sig.sync.RwMux; const ServiceManager = sig.utils.service_manager.ServiceManager; const Slot = sig.core.Slot; +const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker; const RepairPeerProvider = shred_collector.repair_service.RepairPeerProvider; @@ -47,6 +48,7 @@ pub const ShredCollectorDependencies = struct { gossip_table_rw: *RwMux(GossipTable), /// Shared state that is read from gossip my_shred_version: *const Atomic(u16), + leader_schedule: SlotLeaderProvider, }; /// Start the Shred Collector. @@ -62,7 +64,7 @@ pub fn start( conf: ShredCollectorConfig, deps: ShredCollectorDependencies, ) !ServiceManager { - var service_manager = ServiceManager.init(deps.allocator, deps.logger, deps.exit); + var service_manager = ServiceManager.init(deps.allocator, deps.logger, deps.exit, "shred collector", .{}, .{}); var arena = service_manager.arena(); const repair_socket = try bindUdpReusable(conf.repair_port); @@ -82,14 +84,20 @@ pub fn start( .unverified_shred_sender = unverified_shred_channel, .shred_version = deps.my_shred_version, .metrics = try ShredReceiverMetrics.init(), + .root_slot = if (conf.start_slot) |s| s - 1 else 0, }; - try service_manager.spawn(.{ .name = "Shred Receiver" }, ShredReceiver.run, .{shred_receiver}); + try service_manager.spawn("Shred Receiver", ShredReceiver.run, .{shred_receiver}); // verifier (thread) try service_manager.spawn( - .{ .name = "Shred Verifier" }, + "Shred Verifier", shred_collector.shred_verifier.runShredVerifier, - .{ deps.exit, unverified_shred_channel, verified_shred_channel, .{} }, + .{ + deps.exit, + unverified_shred_channel, + verified_shred_channel, + deps.leader_schedule, + }, ); // tracker (shared state, internal to Shred Collector) @@ -101,7 +109,7 @@ pub fn start( // processor (thread) try service_manager.spawn( - .{ .name = "Shred Processor" }, + "Shred Processor", shred_collector.shred_processor.runShredProcessor, .{ deps.allocator, deps.exit, deps.logger, verified_shred_channel, shred_tracker }, ); @@ -132,11 +140,7 @@ pub fn start( repair_peer_provider, shred_tracker, ); - try service_manager.spawn( - .{ .name = "Repair Service" }, - RepairService.run, - .{repair_svc}, - ); + try service_manager.spawn("Repair Service", RepairService.run, .{repair_svc}); return service_manager; } diff --git a/src/shred_collector/shred.zig b/src/shred_collector/shred.zig index c7b5604be..1783f438d 100644 --- a/src/shred_collector/shred.zig +++ b/src/shred_collector/shred.zig @@ -219,47 +219,17 @@ pub fn GenericShred( } } - /// TODO should this be memoized? - fn capacity(proof_size: u8, chained: bool, resigned: bool) !usize { - std.debug.assert(chained or !resigned); - return checkedSub( - constants.payload_size, - constants.headers_size + - if (chained) SIZE_OF_MERKLE_ROOT else 0 + - proof_size * merkle_proof_entry_size + - if (resigned) SIGNATURE_LENGTH else 0, - ) catch error.InvalidProofSize; - } - /// The return contains a pointer to data owned by the shred. fn merkleProof(self: *const Self) !MerkleProofEntryList { - const size = self.common.shred_variant.proof_size * merkle_proof_entry_size; - const offset = try self.proofOffset(); - const end = offset + size; - if (self.payload.len < end) { - return error.InsufficentPayloadSize; - } - return .{ - .bytes = self.payload[offset..end], - .len = self.common.shred_variant.proof_size, - }; - } - - // Where the merkle proof starts in the shred binary. - fn proofOffset(self: *const Self) !usize { - const v = self.common.shred_variant; - return constants.headers_size + - try capacity(v.proof_size, v.chained, v.resigned) + - if (v.chained) SIZE_OF_MERKLE_ROOT else 0; + return getMerkleProof(self.payload, constants, self.common.shred_variant); } fn erasureShardAsSlice(self: *const Self) ![]u8 { if (self.payload.len() != self.constants().payload_size) { return error.InvalidPayloadSize; } - const variant = self.common.shred_variant; const end = constants.headers_size + - try capacity(variant.proof_size, variant.chained, variant.resigned) + + try capacity(self.common.shred_variant) + SIGNATURE_LENGTH; if (self.payload.len < end) { return error.InsufficientPayloadSize; @@ -269,19 +239,148 @@ pub fn GenericShred( }; } +fn getMerkleRoot( + shred: []const u8, + constants: ShredConstants, + variant: ShredVariant, +) !Hash { + const index = switch (variant.shred_type) { + .Code => codeIndex(shred) orelse return error.InvalidErasureShardIndex, + .Data => dataIndex(shred) orelse return error.InvalidErasureShardIndex, + }; + const proof = try getMerkleProof(shred, constants, variant); + const offset = try proofOffset(constants, variant); + const node = try getMerkleNode(shred, SIGNATURE_LENGTH, offset); + return calculateMerkleRoot(index, node, proof); +} + +fn getMerkleProof( + shred: []const u8, + constants: ShredConstants, + variant: ShredVariant, +) !MerkleProofEntryList { + const size = variant.proof_size * merkle_proof_entry_size; + const offset = try proofOffset(constants, variant); + const end = offset + size; + if (shred.len < end) { + return error.InsufficentPayloadSize; + } + return .{ + .bytes = shred[offset..end], + .len = variant.proof_size, + }; +} + +fn getMerkleNode(shred: []const u8, start: usize, end: usize) !Hash { + if (shred.len < end) return error.InvalidPayloadSize; + return hashv(&.{ MERKLE_HASH_PREFIX_LEAF, shred[start..end] }); +} + +/// [get_merkle_root](https://github.com/anza-xyz/agave/blob/ed500b5afc77bc78d9890d96455ea7a7f28edbf9/ledger/src/shred/merkle.rs#L702) +fn calculateMerkleRoot(start_index: usize, start_node: Hash, proof: MerkleProofEntryList) !Hash { + var index = start_index; + var node = start_node; + var iterator = proof.iterator(); + while (iterator.next()) |other| { + node = if (index % 2 == 0) + joinNodes(&node.data, &other) + else + joinNodes(&other, &node.data); + index = index >> 1; + } + if (index != 0) return error.InvalidMerkleProof; + return node; +} + +const MERKLE_HASH_PREFIX_LEAF: *const [26]u8 = "\x00SOLANA_MERKLE_SHREDS_LEAF"; +const MERKLE_HASH_PREFIX_NODE: *const [26]u8 = "\x01SOLANA_MERKLE_SHREDS_NODE"; + +fn joinNodes(lhs: []const u8, rhs: []const u8) Hash { + // TODO check + return hashv(&.{ + MERKLE_HASH_PREFIX_NODE, + lhs[0..merkle_proof_entry_size], + rhs[0..merkle_proof_entry_size], + }); +} + +pub fn hashv(vals: []const []const u8) Hash { + var hasher = std.crypto.hash.sha2.Sha256.init(.{}); + for (vals) |val| hasher.update(val); + return .{ .data = hasher.finalResult() }; +} + +/// Where the merkle proof starts in the shred binary. +fn proofOffset(constants: ShredConstants, variant: ShredVariant) !usize { + return constants.headers_size + + try capacity(constants, variant) + + if (variant.chained) SIZE_OF_MERKLE_ROOT else 0; +} + +fn capacity(constants: ShredConstants, variant: ShredVariant) !usize { + std.debug.assert(variant.chained or !variant.resigned); + return checkedSub( + constants.payload_size, + constants.headers_size + + if (variant.chained) SIZE_OF_MERKLE_ROOT else 0 + + variant.proof_size * merkle_proof_entry_size + + if (variant.resigned) SIGNATURE_LENGTH else 0, + ) catch error.InvalidProofSize; +} + +/// Shred index in the erasure batch. +/// This only works for coding shreds. +fn codeIndex(shred: []const u8) ?usize { + const num_data_shreds: usize = @intCast(getInt(u16, shred, 83) orelse return null); + const position: usize = @intCast(getInt(u16, shred, 87) orelse return null); + return checkedAdd(num_data_shreds, position) catch null; +} + +/// Shred index in the erasure batch +/// This only works for data shreds. +fn dataIndex(shred: []const u8) ?usize { + const fec_set_index = getInt(u32, shred, 79) orelse return null; + const layout_index = layout.getIndex(shred) orelse return null; + const index = checkedSub(layout_index, fec_set_index) catch return null; + return @intCast(index); +} + const MerkleProofEntry = [merkle_proof_entry_size]u8; const merkle_proof_entry_size: usize = 20; +const MerkleProofIterator = Iterator(MerkleProofEntryList, MerkleProofEntry); + +pub fn Iterator(comptime Collection: type, comptime Item: type) type { + return struct { + list: Collection, + index: usize, + + pub fn next(self: *@This()) ?Item { + if (self.index >= self.list.len) { + return null; + } + defer self.index += 1; + return self.list.get(self.index); + } + }; +} + /// This is a reference. It does not own the data. Be careful with its lifetime. const MerkleProofEntryList = struct { bytes: []const u8, len: usize, - pub fn get(self: *@This(), index: usize) error{IndexOutOfBounds}!MerkleProofEntry { - if (index > self.len) return error.IndexOutOfBounds; + pub fn get(self: *@This(), index: usize) ?MerkleProofEntry { + if (index > self.len) return null; const start = index * merkle_proof_entry_size; const end = start + merkle_proof_entry_size; - return self.bytes[start..end]; + var entry: MerkleProofEntry = undefined; + @memcpy(&entry, self.bytes[start..end]); + return entry; + } + + pub fn iterator(self: @This()) MerkleProofIterator { + return .{ .list = self, .index = 0 }; } }; @@ -481,9 +580,11 @@ pub const layout = struct { pub fn getSignedData(shred: []const u8) ?Hash { const variant = getShredVariant(shred) orelse return null; - _ = variant; - // TODO implement this once the leader schedule is available to runShredSigVerify - return Hash.default(); + const constants = switch (variant.shred_type) { + .Code => coding_shred, + .Data => data_shred, + }; + return getMerkleRoot(shred, constants, variant) catch null; } /// must be a data shred, otherwise the return value will be corrupted and meaningless @@ -491,21 +592,21 @@ pub const layout = struct { std.debug.assert(getShredVariant(shred).?.shred_type == .Data); return getInt(u16, shred, 83); } - - /// Extracts a little-endian integer from within the slice, - /// starting at start_index. - fn getInt( - comptime Int: type, - data: []const u8, - start_index: usize, - ) ?Int { - const end_index = start_index + @sizeOf(Int); - if (data.len < end_index) return null; - const bytes: *const [@sizeOf(Int)]u8 = @ptrCast(data[start_index..end_index]); - return std.mem.readInt(Int, bytes, .little); - } }; +/// Extracts a little-endian integer from within the slice, +/// starting at start_index. +fn getInt( + comptime Int: type, + data: []const u8, + start_index: usize, +) ?Int { + const end_index = start_index + @sizeOf(Int); + if (data.len < end_index) return null; + const bytes: *const [@sizeOf(Int)]u8 = @ptrCast(data[start_index..end_index]); + return std.mem.readInt(Int, bytes, .little); +} + test "basic shred variant round trip" { try testShredVariantRoundTrip(0x4C, .{ .shred_type = .Code, @@ -526,3 +627,65 @@ fn testShredVariantRoundTrip(expected_byte: u8, start_variant: ShredVariant) !vo start_variant.resigned == end_variant.resigned, ); } + +test "getShredVariant" { + const variant = layout.getShredVariant(&test_data_shred).?; + try std.testing.expect(.Data == variant.shred_type); + try std.testing.expect(!variant.chained); + try std.testing.expect(!variant.resigned); + try std.testing.expect(6 == variant.proof_size); +} + +test "dataIndex" { + try std.testing.expect(31 == dataIndex(&test_data_shred).?); +} + +test "getIndex" { + try std.testing.expect(65 == layout.getIndex(&test_data_shred).?); +} + +test "getMerkleRoot" { + const variant = layout.getShredVariant(&test_data_shred).?; + const merkle_root = try getMerkleRoot(&test_data_shred, data_shred, variant); + const expected_signed_data = [_]u8{ + 224, 241, 85, 253, 247, 62, 137, 179, 152, 192, 186, 203, 121, 194, 178, 130, + 33, 181, 143, 156, 220, 150, 69, 197, 81, 97, 237, 11, 74, 156, 129, 134, + }; + try std.testing.expect(std.mem.eql(u8, &expected_signed_data, &merkle_root.data)); +} + +test "getSignature" { + const signature = layout.getSignature(&test_data_shred).?; + const expected_signature = [_]u8{ + 102, 205, 108, 67, 218, 3, 214, 186, 28, 110, 167, 22, 75, 135, 233, 156, 45, 215, 209, 1, + 253, 53, 142, 52, 6, 98, 158, 51, 157, 207, 190, 22, 96, 106, 68, 248, 244, 162, 13, 205, + 193, 194, 143, 192, 142, 141, 134, 85, 93, 252, 43, 200, 224, 101, 12, 28, 97, 202, 230, 215, + 34, 217, 20, 7, + }; + try std.testing.expect(std.mem.eql(u8, &expected_signature, &signature.data)); +} + +test "getSignedData" { + const signed_data = layout.getSignedData(&test_data_shred).?; + const expected_signed_data = [_]u8{ + 224, 241, 85, 253, 247, 62, 137, 179, 152, 192, 186, 203, 121, 194, 178, 130, + 33, 181, 143, 156, 220, 150, 69, 197, 81, 97, 237, 11, 74, 156, 129, 134, + }; + try std.testing.expect(std.mem.eql(u8, &expected_signed_data, &signed_data.data)); +} + +const test_data_shred = [_]u8{ + 102, 205, 108, 67, 218, 3, 214, 186, 28, 110, 167, 22, 75, 135, 233, 156, 45, 215, + 209, 1, 253, 53, 142, 52, 6, 98, 158, 51, 157, 207, 190, 22, 96, 106, 68, 248, + 244, 162, 13, 205, 193, 194, 143, 192, 142, 141, 134, 85, 93, 252, 43, 200, 224, 101, + 12, 28, 97, 202, 230, 215, 34, 217, 20, 7, 134, 105, 170, 47, 18, 0, 0, 0, + 0, 65, 0, 0, 0, 71, 176, 34, 0, 0, 0, 1, 0, 192, 88, +} ++ .{0} ** 996 ++ .{ + 247, 170, 109, 175, 191, 111, 108, 73, 56, 57, 34, 185, 81, 218, 60, 244, 53, 227, + 243, 72, 15, 175, 148, 58, 42, 0, 133, 246, 67, 118, 164, 221, 109, 136, 179, 199, + 15, 177, 139, 110, 105, 222, 165, 194, 78, 25, 172, 56, 165, 69, 28, 80, 215, 72, + 10, 21, 144, 236, 44, 107, 166, 65, 197, 164, 106, 113, 9, 68, 227, 37, 134, 158, + 192, 200, 22, 30, 244, 177, 106, 84, 161, 246, 35, 21, 26, 163, 104, 181, 13, 189, + 247, 250, 214, 101, 190, 52, 28, 152, 85, 9, 49, 168, 162, 199, 128, 242, 217, 219, + 71, 219, 72, 191, 107, 210, 46, 255, 206, 122, 234, 142, 229, 214, 240, 186, +}; diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index d8485df66..5d83674dd 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -25,10 +25,9 @@ pub fn runShredProcessor( ) !void { var buf = ArrayList(ArrayList(Packet)).init(allocator); var error_context = ErrorContext{}; - while (true) { + while (!exit.load(.unordered)) { try verified_shred_receiver.tryDrainRecycle(&buf); if (buf.items.len == 0) { - if (exit.load(.monotonic)) return; std.time.sleep(10 * std.time.ns_per_ms); continue; } @@ -43,7 +42,6 @@ pub fn runShredProcessor( }; }; } - if (exit.load(.monotonic)) return; } } diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index 8db46fcce..5828bf2a6 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -35,6 +35,7 @@ pub const ShredReceiver = struct { unverified_shred_sender: *Channel(ArrayList(Packet)), shred_version: *const Atomic(u16), metrics: ShredReceiverMetrics, + root_slot: Slot, // TODO: eventually, this should be handled by BankForks const Self = @This(); @@ -118,10 +119,8 @@ pub const ShredReceiver = struct { try self.handlePing(packet, responses); packet.flags.set(.discard); } else { - // TODO set correct values once using snapshot + blockstore - const root = 0; - const max_slot = std.math.maxInt(Slot); - if (shouldDiscardShred(packet, root, shred_version, max_slot)) { + const max_slot = std.math.maxInt(Slot); // TODO agave uses BankForks for this + if (shouldDiscardShred(packet, self.root_slot, shred_version, max_slot)) { packet.flags.set(.discard); } } diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index 0de37fd1a..2404a6520 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -8,6 +8,7 @@ const ArrayList = std.ArrayList; const Atomic = std.atomic.Value; const Channel = sig.sync.Channel; +const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; const Packet = sig.net.Packet; /// Analogous to [run_shred_sigverify](https://github.com/anza-xyz/agave/blob/8c5a33a81a0504fd25d0465bed35d153ff84819f/turbine/src/sigverify_shreds.rs#L82) @@ -17,21 +18,20 @@ pub fn runShredVerifier( unverified_shred_receiver: *Channel(ArrayList(Packet)), /// me --> shred processor verified_shred_sender: *Channel(ArrayList(Packet)), - leader_schedule: LeaderScheduleCalculator, + leader_schedule: SlotLeaderProvider, ) !void { var verified_count: usize = 0; var buf = ArrayList(ArrayList(Packet)).init(unverified_shred_receiver.allocator); - while (true) { + while (!exit.load(.unordered)) { try unverified_shred_receiver.tryDrainRecycle(&buf); if (buf.items.len == 0) { - if (exit.load(.monotonic)) return; std.time.sleep(10 * std.time.ns_per_ms); continue; } for (buf.items) |packet_batch| { // TODO parallelize this once it's actually verifying signatures for (packet_batch.items) |*packet| { - if (!verifyShred(packet, &leader_schedule)) { + if (!verifyShred(packet, leader_schedule)) { packet.flags.set(.discard); } else { verified_count += 1; @@ -44,24 +44,14 @@ pub fn runShredVerifier( } /// verify_shred_cpu -fn verifyShred(packet: *const Packet, leader_schedule: *const LeaderScheduleCalculator) bool { +fn verifyShred(packet: *const Packet, leader_schedule: SlotLeaderProvider) bool { if (packet.flags.isSet(.discard)) return false; const shred = shred_layout.getShred(packet) orelse return false; const slot = shred_layout.getSlot(shred) orelse return false; const signature = shred_layout.getSignature(shred) orelse return false; const signed_data = shred_layout.getSignedData(shred) orelse return false; - // TODO: once implemented, this should no longer be optional - if (leader_schedule.getLeader(slot)) |leader| { - return signature.verify(leader, &signed_data.data); - } + const leader = leader_schedule.call(slot) orelse return false; - return true; + return signature.verify(leader, &signed_data.data); } - -// TODO -pub const LeaderScheduleCalculator = struct { - fn getLeader(_: *const @This(), _: sig.core.Slot) ?sig.core.Pubkey { - return null; - } -}; diff --git a/src/trace/log.zig b/src/trace/log.zig index 70c84118f..cf20ab799 100644 --- a/src/trace/log.zig +++ b/src/trace/log.zig @@ -90,6 +90,15 @@ pub const Logger = union(enum) { } } + pub fn logf(self: Self, level: Level, comptime fmt: []const u8, args: anytype) void { + switch (self) { + .standard => |logger| { + logger.logf(level, fmt, args); + }, + .noop => {}, + } + } + pub fn info(self: Self, msg: []const u8) void { switch (self) { .standard => |logger| { @@ -126,6 +135,15 @@ pub const Logger = union(enum) { } } + pub fn log(self: Self, level: Level, msg: []const u8) void { + switch (self) { + .standard => |logger| { + logger.log(level, msg); + }, + .noop => {}, + } + } + /// Can be used in tests to minimize the amount of logging during tests. pub const TEST_DEFAULT_LEVEL: Level = .warn; }; diff --git a/src/utils/closure.zig b/src/utils/closure.zig new file mode 100644 index 000000000..e8e0763c6 --- /dev/null +++ b/src/utils/closure.zig @@ -0,0 +1,31 @@ +/// Generically represents a function with two inputs: +/// 1. enclosed state that is passed on initialization (as a pointer). +/// 2. input that is passed at the call site. +/// +/// The enclosed state's type is abstracted with dynamic dispatch. +/// +/// Contains a pointer to data that is owned by another context. Ensure that +/// the lifetime of that data exceeds the lifetime of this struct. +pub fn PointerClosure(comptime Input: type, comptime Output: type) type { + return struct { + state: *anyopaque, + genericFn: *const fn (*anyopaque, Input) Output, + + const Self = @This(); + + pub fn init(state: anytype, getFn: fn (@TypeOf(state), Input) Output) Self { + return .{ + .state = @alignCast(@ptrCast(state)), + .genericFn = struct { + fn callGeneric(generic_state: *anyopaque, slot: Input) Output { + return getFn(@alignCast(@ptrCast(generic_state)), slot); + } + }.callGeneric, + }; + } + + pub fn call(self: Self, slot: Input) Output { + return self.genericFn(self.state, slot); + } + }; +} diff --git a/src/utils/lib.zig b/src/utils/lib.zig index 18848bc64..84ab4941c 100644 --- a/src/utils/lib.zig +++ b/src/utils/lib.zig @@ -1,4 +1,5 @@ pub const collections = @import("collections.zig"); +pub const closure = @import("closure.zig"); pub const bitflags = @import("bitflags.zig"); pub const directory = @import("directory.zig"); pub const lazy = @import("lazy.zig"); diff --git a/src/utils/service.zig b/src/utils/service.zig index c8c2dcd03..6c554e0c1 100644 --- a/src/utils/service.zig +++ b/src/utils/service.zig @@ -7,8 +7,9 @@ const ArenaAllocator = std.heap.ArenaAllocator; const ArrayList = std.ArrayList; const Atomic = std.atomic.Value; -const Logger = sig.trace.Logger; const Lazy = sig.utils.lazy.Lazy; +const Level = sig.trace.Level; +const Logger = sig.trace.Logger; /// High level manager for long-running threads and the state /// shared by those threads. @@ -25,16 +26,29 @@ pub const ServiceManager = struct { _arena: ArenaAllocator, /// Logic to run after all threads join. defers: DeferList, + name: []const u8, + default_run_config: RunConfig, + default_spawn_config: std.Thread.SpawnConfig, const Self = @This(); - pub fn init(allocator: Allocator, logger: Logger, exit: *Atomic(bool)) Self { + pub fn init( + allocator: Allocator, + logger: Logger, + exit: *Atomic(bool), + name: []const u8, + default_run_config: RunConfig, + default_spawn_config: std.Thread.SpawnConfig, + ) Self { return .{ .logger = logger, .exit = exit, .threads = ArrayList(std.Thread).init(allocator), ._arena = ArenaAllocator.init(allocator), .defers = DeferList.init(allocator), + .name = name, + .default_run_config = default_run_config, + .default_spawn_config = default_spawn_config, }; } @@ -50,19 +64,32 @@ pub const ServiceManager = struct { } /// Spawn a thread to be managed. - /// The function may be restarted periodically, according to the config. + /// The function may be restarted periodically, according to default_run_config. pub fn spawn( self: *Self, - config: RunConfig, + name: ?[]const u8, + comptime function: anytype, + args: anytype, + ) !void { + return self.spawnCustom(name, self.default_run_config, self.default_spawn_config, function, args); + } + + /// Spawn a thread to be managed. + /// The function may be restarted periodically, according to the provided config. + pub fn spawnCustom( + self: *Self, + maybe_name: ?[]const u8, + run_config: ?RunConfig, + spawn_config: std.Thread.SpawnConfig, comptime function: anytype, args: anytype, ) !void { var thread = try std.Thread.spawn( - .{}, + spawn_config, runService, - .{ self.logger, self.exit, config, function, args }, + .{ self.logger, self.exit, maybe_name, run_config orelse self.default_run_config, function, args }, ); - if (config.name) |name| thread.setName(name) catch {}; + if (maybe_name) |name| thread.setName(name) catch {}; try self.threads.append(thread); } @@ -76,20 +103,21 @@ pub const ServiceManager = struct { /// 2. Wait for threads to exit. /// 3. Deinit the shared state from those threads. pub fn deinit(self: Self) void { + self.logger.infof("Cleaning up: {s}", .{self.name}); self.exit.store(true, .monotonic); for (self.threads.items) |t| t.join(); self.threads.deinit(); self.defers.deinit(); self._arena.deinit(); + self.logger.infof("Finished cleaning up: {s}", .{self.name}); } }; pub const RunConfig = struct { - name: ?[]const u8 = null, /// what to do when the task returns without error - return_handler: ReturnHandler = .keep_looping, - /// what to do when the task returns with an error - error_handler: ReturnHandler = .keep_looping, + return_handler: ReturnHandler = .{}, + /// what to do when the task returns an error + error_handler: ReturnHandler = .{}, /// The minimum amount of time to spend on the entire loop, /// including the logic plus the pause. min_loop_duration_ns: u64 = 0, @@ -98,10 +126,17 @@ pub const RunConfig = struct { min_pause_ns: u64 = 0, }; -pub const ReturnHandler = enum { - keep_looping, - just_return, - set_exit_and_return, +pub const ReturnHandler = struct { + /// Loop the task until the return event occurs this many times. + /// null means an infinite loop. + max_iterations: ?u64 = null, + /// Whether to set the `exit` bool to true after max_iterations + /// is reached. + set_exit_on_completion: bool = false, + /// Whether to log after each return. + log_return: bool = true, + /// Whether to log when exiting on the final return. + log_exit: bool = true, }; /// Convert a short-lived task into a long-lived service by looping it, @@ -109,12 +144,13 @@ pub const ReturnHandler = enum { pub fn runService( logger: Logger, exit: *Atomic(bool), + maybe_name: ?[]const u8, config: RunConfig, function: anytype, args: anytype, ) !void { var buf: [16]u8 = undefined; - const name = config.name orelse try std.fmt.bufPrint( + const name = maybe_name orelse try std.fmt.bufPrint( &buf, "thread {d}", .{std.Thread.getCurrentId()}, @@ -122,40 +158,46 @@ pub fn runService( logger.infof("Starting {s}", .{name}); var timer = try std.time.Timer.start(); var last_iteration: u64 = 0; + var num_oks: u64 = 0; + var num_errors: u64 = 0; while (!exit.load(.unordered)) { - if (@call(.auto, function, args)) |ok| { - switch (config.error_handler) { - .keep_looping => {}, - .just_return => { - logger.errf("Exiting {s} due to return", .{name}); - return ok; - }, - .set_exit_and_return => { - logger.errf("Signalling exit due to return from {s}", .{name}); - exit.store(true, .monotonic); - return ok; - }, - } - } else |err| { - switch (config.error_handler) { - .keep_looping => logger.errf("Unhandled error in {s}: {}", .{ name, err }), - .just_return => { - logger.errf("Exiting {s} due to error: {}", .{ name, err }); - return err; - }, - .set_exit_and_return => { - logger.errf("Signalling exit due to error in {s}: {}", .{ name, err }); - exit.store(true, .monotonic); - return err; - }, - } + const result = @call(.auto, function, args); + + // identify result + if (result) |_| num_oks += 1 else |_| num_errors += 1; + const handler, const num_events, const event_name, const level = if (result) |_| + .{ config.return_handler, num_oks, "return", Level.info } + else |_| + .{ config.error_handler, num_errors, "error", Level.err }; + + // handle result + if (handler.log_return) { + logger.logf(level, "{s} has {s}ed: {any}", .{ name, event_name, result }); } + if (handler.max_iterations) |max| if (num_events >= max) { + if (handler.set_exit_on_completion) { + if (handler.log_exit) logger.logf( + level, + "Signaling exit due to {} {s}s from {s}", + .{ num_events, event_name, name }, + ); + exit.store(true, .monotonic); + } else if (handler.log_exit) logger.logf( + level, + "Exiting {s} due to {} {s}s", + .{ name, num_events, event_name }, + ); + return result; + }; + + // sleep before looping, if necessary last_iteration = timer.lap(); std.time.sleep(@max( config.min_pause_ns, config.min_loop_duration_ns -| last_iteration, )); } + logger.infof("Exiting {s} because the exit signal was received.", .{name}); } /// Defer actions until later. diff --git a/test_data/.gitignore b/test_data/.gitignore new file mode 100644 index 000000000..968c68d77 --- /dev/null +++ b/test_data/.gitignore @@ -0,0 +1,13 @@ +# turn this file into a whitelist +* + +# the files to include in git +!.gitignore +!10 +!25 +!genesis.bin +!incremental-snapshot-10-25-GXgKvm3NMAPgGdv2verVaNXmKTHQgfy2TAxLVEfAvdCS.tar +!incremental-snapshot-10-25-GXgKvm3NMAPgGdv2verVaNXmKTHQgfy2TAxLVEfAvdCS.tar.zst +!snapshot-10-6ExseAZAVJsAZjhimxHTR7N8p6VGXiDNdsajYh1ipjAD.tar.zst +!status_cache +!test_account_file