diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index 2dd564c5a..f7f440de0 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -1,25 +1,28 @@ const std = @import("std"); +const zstd = @import("zstd"); +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; + const ArrayList = std.ArrayList; const HashMap = std.AutoHashMap; - -const _genesis_config = @import("genesis_config.zig"); -const UnixTimestamp = _genesis_config.UnixTimestamp; -const FeeRateGovernor = _genesis_config.FeeRateGovernor; -const EpochSchedule = _genesis_config.EpochSchedule; -const Rent = _genesis_config.Rent; -const Inflation = _genesis_config.Inflation; - -const Account = @import("../core/account.zig").Account; -const Hash = @import("../core/hash.zig").Hash; -const Slot = @import("../core/time.zig").Slot; -const Epoch = @import("../core/time.zig").Epoch; -const Pubkey = @import("../core/pubkey.zig").Pubkey; -const bincode = @import("../bincode/bincode.zig"); -const defaultArrayListOnEOFConfig = @import("../utils/arraylist.zig").defaultArrayListOnEOFConfig; -const readDirectory = @import("../utils/directory.zig").readDirectory; -pub const sysvars = @import("sysvars.zig"); -const ZstdReader = @import("zstd").Reader; -const parallelUntarToFileSystem = @import("../utils/tar.zig").parallelUntarToFileSystem; +const ZstdReader = zstd.Reader; + +const Account = sig.core.account.Account; +const Hash = sig.core.hash.Hash; +const Slot = sig.core.time.Slot; +const Epoch = sig.core.time.Epoch; +const Pubkey = sig.core.pubkey.Pubkey; +const UnixTimestamp = sig.accounts_db.genesis_config.UnixTimestamp; +const FeeRateGovernor = sig.accounts_db.genesis_config.FeeRateGovernor; +const EpochSchedule = sig.accounts_db.genesis_config.EpochSchedule; +const Rent = sig.accounts_db.genesis_config.Rent; +const Inflation = sig.accounts_db.genesis_config.Inflation; +const SlotHistory = sig.accounts_db.sysvars.SlotHistory; + +const defaultArrayListOnEOFConfig = bincode.arraylist.defaultArrayListOnEOFConfig; +const readDirectory = sig.utils.directory.readDirectory; +const parallelUntarToFileSystem = sig.utils.tar.parallelUntarToFileSystem; pub const MAXIMUM_ACCOUNT_FILE_SIZE: u64 = 16 * 1024 * 1024 * 1024; // 16 GiB pub const MAX_RECENT_BLOCKHASHES: usize = 300; @@ -739,7 +742,7 @@ pub const StatusCache = struct { self: *const StatusCache, allocator: std.mem.Allocator, bank_slot: Slot, - slot_history: *const sysvars.SlotHistory, + slot_history: *const SlotHistory, ) !void { // status cache validation const len = self.bank_slot_deltas.items.len; diff --git a/src/utils/arraylist.zig b/src/bincode/arraylist.zig similarity index 96% rename from src/utils/arraylist.zig rename to src/bincode/arraylist.zig index 2d28b7c41..731a7ca17 100644 --- a/src/utils/arraylist.zig +++ b/src/bincode/arraylist.zig @@ -1,5 +1,7 @@ const std = @import("std"); -const bincode = @import("../bincode/bincode.zig"); +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; pub fn ArrayListConfig(comptime Child: type) bincode.FieldConfig(std.ArrayList(Child)) { const S = struct { diff --git a/src/bincode/bincode.zig b/src/bincode/bincode.zig index 9a97ecb40..fae87b9e3 100644 --- a/src/bincode/bincode.zig +++ b/src/bincode/bincode.zig @@ -1,16 +1,13 @@ +pub const arraylist = @import("arraylist.zig"); +pub const shortvec = @import("shortvec.zig"); +pub const varint = @import("varint.zig"); + const std = @import("std"); const testing = std.testing; const bincode = @This(); -pub const config = struct { - // TODO move these files to the bincode folder - pub const arraylist = @import("../utils/arraylist.zig"); - pub const shortvec = @import("../utils/shortvec.zig"); - pub const varint = @import("../utils/varint.zig"); -}; - pub const Params = struct { pub const legacy: Params = .{ .endian = .little, @@ -706,7 +703,7 @@ test "bincode: custom enum" { } test "bincode: default on eof" { - const defaultArrayListOnEOFConfig = @import("../utils/arraylist.zig").defaultArrayListOnEOFConfig; + const defaultArrayListOnEOFConfig = @import("../lib.zig").bincode.arraylist.defaultArrayListOnEOFConfig; const Foo = struct { value: u8 = 0, accounts: std.ArrayList(u64), diff --git a/src/utils/shortvec.zig b/src/bincode/shortvec.zig similarity index 92% rename from src/utils/shortvec.zig rename to src/bincode/shortvec.zig index 97e0f89fc..fc674496d 100644 --- a/src/utils/shortvec.zig +++ b/src/bincode/shortvec.zig @@ -1,7 +1,10 @@ const std = @import("std"); -const bincode = @import("../bincode/bincode.zig"); -const serialize_short_u16 = @import("varint.zig").serialize_short_u16; -const deserialize_short_u16 = @import("varint.zig").deserialize_short_u16; +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; + +const serialize_short_u16 = sig.bincode.varint.serialize_short_u16; +const deserialize_short_u16 = sig.bincode.varint.deserialize_short_u16; pub fn ShortVecConfig(comptime Child: type) bincode.FieldConfig([]Child) { const S = struct { diff --git a/src/utils/varint.zig b/src/bincode/varint.zig similarity index 98% rename from src/utils/varint.zig rename to src/bincode/varint.zig index 9615b8a5c..3293e284c 100644 --- a/src/utils/varint.zig +++ b/src/bincode/varint.zig @@ -1,5 +1,7 @@ const std = @import("std"); -const bincode = @import("../bincode/bincode.zig"); +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; pub fn VarIntConfig(comptime VarInt: type) bincode.FieldConfig(VarInt) { const S = struct { diff --git a/src/bloom/bloom.zig b/src/bloom/bloom.zig index 5e37a8658..abeb7c3a6 100644 --- a/src/bloom/bloom.zig +++ b/src/bloom/bloom.zig @@ -1,16 +1,17 @@ const std = @import("std"); -const ArrayList = std.ArrayList; -const DynamicArrayBitSet = @import("bit_set.zig").DynamicArrayBitSet; -const BitVec = @import("bit_vec.zig").BitVec; -const ArrayListConfig = @import("../utils/arraylist.zig").ArrayListConfig; - -const bincode = @import("../bincode/bincode.zig"); -const BitVecConfig = @import("bit_vec.zig").BitVecConfig; +const sig = @import("../lib.zig"); -const FnvHasher = @import("../crypto/fnv.zig").FnvHasher; const testing = std.testing; +const bincode = sig.bincode; const RndGen = std.rand.DefaultPrng; +const ArrayList = std.ArrayList; + +const DynamicArrayBitSet = sig.bloom.bit_set.DynamicArrayBitSet; +const BitVec = sig.bloom.bit_vec.BitVec; +const BitVecConfig = sig.bloom.bit_vec.BitVecConfig; +const ArrayListConfig = bincode.arraylist.ArrayListConfig; +const FnvHasher = sig.crypto.FnvHasher; /// A bloom filter whose bitset is made up of u64 blocks pub const Bloom = struct { diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 4f31478e4..a1a0b6da8 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -211,7 +211,7 @@ var force_new_snapshot_download_option = cli.Option{ var snapshot_dir_option = cli.Option{ .long_name = "snapshot-dir", - .help = "path to snapshot directory (where snapshots are downloaded and/or unpacked to/from) - default: test_data/", + .help = "path to snapshot directory (where snapshots are downloaded and/or unpacked to/from) - default: ledger/accounts_db", .short_alias = 's', .value_ref = cli.mkRef(&config.current.accounts_db.snapshot_dir), .required = false, @@ -418,6 +418,9 @@ fn validator() !void { const repair_port: u16 = config.current.shred_collector.repair_port; const turbine_recv_port: u16 = config.current.shred_collector.repair_port; + const snapshot_dir_str = config.current.accounts_db.snapshot_dir; + + try std.fs.cwd().makePath(snapshot_dir_str); var gossip_service, var gossip_manager = try startGossip(allocator, &app_base, &.{ .{ .tag = .repair, .port = repair_port }, @@ -750,11 +753,11 @@ fn loadFromSnapshot( // cli parsing const snapshot_dir_str = config.current.accounts_db.snapshot_dir; - const n_cpus = @as(u32, @truncate(try std.Thread.getCpuCount())); - var n_threads_snapshot_load: u32 = @intCast(config.current.accounts_db.num_threads_snapshot_load); - if (n_threads_snapshot_load == 0) { - n_threads_snapshot_load = n_cpus; - } + const n_threads_snapshot_load: u32 = blk: { + const n_threads_snapshot_load: u32 = config.current.accounts_db.num_threads_snapshot_load; + if (n_threads_snapshot_load == 0) break :blk @truncate(try std.Thread.getCpuCount()); + break :blk n_threads_snapshot_load; + }; output.accounts_db = try AccountsDB.init( allocator, diff --git a/src/cmd/config.zig b/src/cmd/config.zig index b3ddc415b..f0eb394eb 100644 --- a/src/cmd/config.zig +++ b/src/cmd/config.zig @@ -36,7 +36,7 @@ pub const AccountsDBConfig = struct { // where to load/save snapshots from - also where disk indexes and account files are stored snapshot_dir: []const u8 = "ledger/accounts_db", // number of threads to load snapshot - num_threads_snapshot_load: u16 = 0, + num_threads_snapshot_load: u32 = 0, // number of threads to unpack snapshot from .tar.zstd num_threads_snapshot_unpack: u16 = 0, // number of shards to use across the index diff --git a/src/core/transaction.zig b/src/core/transaction.zig index 9b02b6845..73a0bec2c 100644 --- a/src/core/transaction.zig +++ b/src/core/transaction.zig @@ -1,8 +1,10 @@ const std = @import("std"); -const ShortVecConfig = @import("../utils/shortvec.zig").ShortVecConfig; -const Signature = @import("signature.zig").Signature; -const Pubkey = @import("pubkey.zig").Pubkey; -const Hash = @import("hash.zig").Hash; +const sig = @import("../lib.zig"); + +const Signature = sig.core.Signature; +const Pubkey = sig.core.Pubkey; +const Hash = sig.core.Hash; +const ShortVecConfig = sig.bincode.shortvec.ShortVecConfig; pub const Transaction = struct { signatures: []Signature, diff --git a/src/gossip/active_set.zig b/src/gossip/active_set.zig index 99d59130a..ce397120c 100644 --- a/src/gossip/active_set.zig +++ b/src/gossip/active_set.zig @@ -1,17 +1,19 @@ const std = @import("std"); -const KeyPair = std.crypto.sign.Ed25519.KeyPair; const network = @import("zig-network"); +const sig = @import("../lib.zig"); + +const KeyPair = std.crypto.sign.Ed25519.KeyPair; const EndPoint = network.EndPoint; -const _gossip_data = @import("../gossip/data.zig"); -const SignedGossipData = _gossip_data.SignedGossipData; -const getWallclockMs = _gossip_data.getWallclockMs; -const ContactInfo = _gossip_data.ContactInfo; -const LegacyContactInfo = _gossip_data.LegacyContactInfo; -const Pubkey = @import("../core/pubkey.zig").Pubkey; -const GossipTable = @import("../gossip/table.zig").GossipTable; -const shuffleFirstN = @import("../gossip/pull_request.zig").shuffleFirstN; -const Bloom = @import("../bloom/bloom.zig").Bloom; +const Pubkey = sig.core.Pubkey; +const Bloom = sig.bloom.Bloom; +const ContactInfo = sig.gossip.data.ContactInfo; +const SignedGossipData = sig.gossip.data.SignedGossipData; +const LegacyContactInfo = sig.gossip.data.LegacyContactInfo; +const GossipTable = sig.gossip.table.GossipTable; + +const getWallclockMs = sig.gossip.getWallclockMs; +const shuffleFirstN = sig.gossip.pull_request.shuffleFirstN; const NUM_ACTIVE_SET_ENTRIES: usize = 25; pub const GOSSIP_PUSH_FANOUT: usize = 6; @@ -22,28 +24,29 @@ const BLOOM_MAX_BITS: usize = 1024 * 8 * 4; pub const ActiveSet = struct { // store pubkeys as keys in gossip table bc the data can change - pruned_peers: std.AutoHashMap(Pubkey, Bloom), + // For each peer, a bloom filter is used to store pruned origins + peers: std.AutoHashMap(Pubkey, Bloom), allocator: std.mem.Allocator, const Self = @This(); pub fn init(allocator: std.mem.Allocator) Self { return Self{ - .pruned_peers = std.AutoHashMap(Pubkey, Bloom).init(allocator), + .peers = std.AutoHashMap(Pubkey, Bloom).init(allocator), .allocator = allocator, }; } pub fn deinit(self: *Self) void { - var iter = self.pruned_peers.iterator(); + var iter = self.peers.iterator(); while (iter.next()) |entry| { entry.value_ptr.deinit(); } - self.pruned_peers.deinit(); + self.peers.deinit(); } pub fn len(self: *const Self) u32 { - return self.pruned_peers.count(); + return self.peers.count(); } pub fn rotate( @@ -52,11 +55,11 @@ pub const ActiveSet = struct { peers: []ContactInfo, ) error{OutOfMemory}!void { // clear the existing - var iter = self.pruned_peers.iterator(); + var iter = self.peers.iterator(); while (iter.next()) |entry| { entry.value_ptr.deinit(); } - self.pruned_peers.clearRetainingCapacity(); + self.peers.clearRetainingCapacity(); if (peers.len == 0) { return; @@ -66,7 +69,7 @@ pub const ActiveSet = struct { const bloom_num_items = @max(peers.len, MIN_NUM_BLOOM_ITEMS); for (0..size) |i| { - const entry = try self.pruned_peers.getOrPut(peers[i].pubkey); + const entry = try self.peers.getOrPut(peers[i].pubkey); if (entry.found_existing == false) { // *full* hard restart on blooms -- labs doesnt do this - bug? const bloom = try Bloom.random( @@ -83,7 +86,7 @@ pub const ActiveSet = struct { pub fn prune(self: *Self, from: Pubkey, origin: Pubkey) void { // we only prune peers which we are sending push messages to - if (self.pruned_peers.getEntry(from)) |entry| { + if (self.peers.getEntry(from)) |entry| { const origin_bytes = origin.data; entry.value_ptr.add(&origin_bytes); } @@ -101,8 +104,7 @@ pub const ActiveSet = struct { var active_set_endpoints = try std.ArrayList(EndPoint).initCapacity(allocator, GOSSIP_PUSH_FANOUT); errdefer active_set_endpoints.deinit(); - // change to while loop - var iter = self.pruned_peers.iterator(); + var iter = self.peers.iterator(); while (iter.next()) |entry| { // lookup peer contact info const peer_info = table.getContactInfo(entry.key_ptr.*) orelse continue; @@ -167,7 +169,7 @@ test "gossip.active_set: init/deinit" { const no_prune_fanout_len = fanout.items.len; try std.testing.expect(no_prune_fanout_len > 0); - var iter = active_set.pruned_peers.keyIterator(); + var iter = active_set.peers.keyIterator(); const peer_pubkey = iter.next().?.*; active_set.prune(peer_pubkey, origin); diff --git a/src/gossip/data.zig b/src/gossip/data.zig index 69051e964..f285b49f5 100644 --- a/src/gossip/data.zig +++ b/src/gossip/data.zig @@ -1,34 +1,32 @@ const std = @import("std"); -const SocketAddr = @import("../net/net.zig").SocketAddr; -const Hash = @import("../core/hash.zig").Hash; -const Signature = @import("../core/signature.zig").Signature; -const Transaction = @import("../core/transaction.zig").Transaction; -const Slot = @import("../core/time.zig").Slot; -const bincode = @import("../bincode/bincode.zig"); -const ArrayList = std.ArrayList; -const KeyPair = std.crypto.sign.Ed25519.KeyPair; -const Pubkey = @import("../core/pubkey.zig").Pubkey; -const sanitizeWallclock = @import("./message.zig").sanitizeWallclock; -const PACKET_DATA_SIZE = @import("../net/packet.zig").PACKET_DATA_SIZE; - const network = @import("zig-network"); -const var_int = @import("../utils/varint.zig"); -const var_int_config_u16 = var_int.var_int_config_u16; -const var_int_config_u64 = var_int.var_int_config_u64; +const sig = @import("../lib.zig"); -const ShortVecArrayListConfig = @import("../utils/shortvec.zig").ShortVecArrayListConfig; -const IpAddr = @import("../net/net.zig").IpAddr; -const gossip = @import("sig").gossip; const testing = std.testing; +const bincode = sig.bincode; -const ClientVersion = @import("../version/version.zig").ClientVersion; - +const ArrayList = std.ArrayList; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; const UdpSocket = network.Socket; const TcpListener = network.Socket; -const net = std.net; -const DynamicArrayBitSet = @import("../bloom/bit_set.zig").DynamicArrayBitSet; -const BitVecConfig = @import("../bloom/bit_vec.zig").BitVecConfig; +const SocketAddr = sig.net.SocketAddr; +const Hash = sig.core.Hash; +const Signature = sig.core.Signature; +const Transaction = sig.core.Transaction; +const Slot = sig.core.Slot; +const Pubkey = sig.core.Pubkey; +const IpAddr = sig.net.IpAddr; +const ClientVersion = sig.version.ClientVersion; +const DynamicArrayBitSet = sig.bloom.bit_set.DynamicArrayBitSet; +const BitVecConfig = sig.bloom.bit_vec.BitVecConfig; +const ShortVecArrayListConfig = sig.bincode.shortvec.ShortVecArrayListConfig; + +const sanitizeWallclock = sig.gossip.message.sanitizeWallclock; + +const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; +const var_int_config_u16 = sig.bincode.varint.var_int_config_u16; +const var_int_config_u64 = sig.bincode.varint.var_int_config_u64; /// returns current timestamp in milliseconds pub fn getWallclockMs() u64 { @@ -103,8 +101,8 @@ pub const SignedGossipData = struct { // should always be enough space or is invalid msg var buf: [PACKET_DATA_SIZE]u8 = undefined; const bytes = try bincode.writeToSlice(&buf, self.data, bincode.Params.standard); - var sig = try keypair.sign(bytes, null); - self.signature.data = sig.toBytes(); + var signature = try keypair.sign(bytes, null); + self.signature.data = signature.toBytes(); } pub fn verify(self: *Self, pubkey: Pubkey) !bool { diff --git a/src/gossip/service.zig b/src/gossip/service.zig index ee6bd4cc5..77984dab2 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -1,58 +1,63 @@ const std = @import("std"); const network = @import("zig-network"); -const EndPoint = network.EndPoint; -const Packet = @import("../net/packet.zig").Packet; -const PACKET_DATA_SIZE = @import("../net/packet.zig").PACKET_DATA_SIZE; -const ThreadPoolTask = @import("../utils/thread.zig").ThreadPoolTask; -const ThreadPool = @import("../sync/thread_pool.zig").ThreadPool; -const Task = ThreadPool.Task; -const Batch = ThreadPool.Batch; +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; +const socket_utils = sig.net.socket_utils; +const pull_request = sig.gossip.pull_request; +const pull_response = sig.gossip.pull_response; + const ArrayList = std.ArrayList; const Thread = std.Thread; const AtomicBool = std.atomic.Value(bool); -const UdpSocket = network.Socket; -const SocketAddr = @import("../net/net.zig").SocketAddr; -const endpointToString = @import("../net/net.zig").endpointToString; -const _gossipMessages = @import("message.zig"); -const GossipMessage = _gossipMessages.GossipMessage; -const PruneData = _gossipMessages.PruneData; -const Mux = @import("../sync/mux.zig").Mux; -const RwMux = @import("../sync/mux.zig").RwMux; -const Ping = @import("ping_pong.zig").Ping; -const Pong = @import("ping_pong.zig").Pong; -const bincode = @import("../bincode/bincode.zig"); -const gossip = @import("../gossip/data.zig"); -const LegacyContactInfo = gossip.LegacyContactInfo; -const ContactInfo = @import("data.zig").ContactInfo; -const SignedGossipData = gossip.SignedGossipData; const KeyPair = std.crypto.sign.Ed25519.KeyPair; -const Pubkey = @import("../core/pubkey.zig").Pubkey; -const getWallclockMs = @import("../gossip/data.zig").getWallclockMs; -const _gossip_table = @import("../gossip/table.zig"); -const GossipTable = _gossip_table.GossipTable; -const HashTimeQueue = _gossip_table.HashTimeQueue; -const UNIQUE_PUBKEY_CAPACITY = _gossip_table.UNIQUE_PUBKEY_CAPACITY; -const AutoArrayHashSet = _gossip_table.AutoArrayHashSet; -const Logger = @import("../trace/log.zig").Logger; -const Entry = @import("../trace/entry.zig").Entry; -const pull_request = @import("../gossip/pull_request.zig"); -const GossipPullFilter = pull_request.GossipPullFilter; -const MAX_NUM_PULL_REQUESTS = pull_request.MAX_NUM_PULL_REQUESTS; -const pull_response = @import("../gossip/pull_response.zig"); -const ActiveSet = @import("../gossip/active_set.zig").ActiveSet; -const Hash = @import("../core/hash.zig").Hash; -const socket_utils = @import("../net/socket_utils.zig"); -const Channel = @import("../sync/channel.zig").Channel; -const PingCache = @import("./ping_pong.zig").PingCache; -const PingAndSocketAddr = @import("./ping_pong.zig").PingAndSocketAddr; -const echo = @import("../net/echo.zig"); -const GossipDumpService = @import("../gossip/dump_service.zig").GossipDumpService; -const ServiceManager = @import("../utils/service.zig").ServiceManager; - -const Registry = @import("../prometheus/registry.zig").Registry; -const globalRegistry = @import("../prometheus/registry.zig").globalRegistry; -const GetMetricError = @import("../prometheus/registry.zig").GetMetricError; -const Counter = @import("../prometheus/counter.zig").Counter; +const EndPoint = network.EndPoint; +const UdpSocket = network.Socket; + +const Pubkey = sig.core.Pubkey; +const Hash = sig.core.Hash; +const Entry = sig.trace.entry.Entry; +const Logger = sig.trace.log.Logger; +const Packet = sig.net.Packet; +const EchoServer = sig.net.echo.Server; +const SocketAddr = sig.net.SocketAddr; +const Counter = sig.prometheus.counter.Counter; +const GetMetricError = sig.prometheus.registry.GetMetricError; +const Registry = sig.prometheus.Registry; +const ThreadPoolTask = sig.utils.thread.ThreadPoolTask; +const ThreadPool = sig.sync.ThreadPool; +const Task = sig.sync.ThreadPool.Task; +const Batch = sig.sync.ThreadPool.Batch; +const Mux = sig.sync.Mux; +const RwMux = sig.sync.RwMux; +const Channel = sig.sync.Channel; +const ActiveSet = sig.gossip.active_set.ActiveSet; +const LegacyContactInfo = sig.gossip.data.LegacyContactInfo; +const ContactInfo = sig.gossip.data.ContactInfo; +const GossipVersionedData = sig.gossip.data.GossipVersionedData; +const SignedGossipData = sig.gossip.data.SignedGossipData; +const GossipData = sig.gossip.data.GossipData; +const GossipDumpService = sig.gossip.dump_service.GossipDumpService; +const GossipMessage = sig.gossip.message.GossipMessage; +const PruneData = sig.gossip.message.PruneData; +const GossipTable = sig.gossip.table.GossipTable; +const HashTimeQueue = sig.gossip.table.HashTimeQueue; +const AutoArrayHashSet = sig.gossip.table.AutoArrayHashSet; +const GossipPullFilter = sig.gossip.pull_request.GossipPullFilter; +const Ping = sig.gossip.ping_pong.Ping; +const Pong = sig.gossip.ping_pong.Pong; +const PingCache = sig.gossip.ping_pong.PingCache; +const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr; +const ServiceManager = sig.utils.service_manager.ServiceManager; + +const endpointToString = sig.net.endpointToString; +const globalRegistry = sig.prometheus.globalRegistry; +const getWallclockMs = sig.gossip.data.getWallclockMs; + +const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; +const UNIQUE_PUBKEY_CAPACITY = sig.gossip.table.UNIQUE_PUBKEY_CAPACITY; +const MAX_NUM_PULL_REQUESTS = sig.gossip.pull_request.MAX_NUM_PULL_REQUESTS; +const MAX_BLOOM_SIZE = sig.gossip.pull_request.MAX_BLOOM_SIZE; const PacketBatch = ArrayList(Packet); const GossipMessageWithEndpoint = struct { from_endpoint: EndPoint, message: GossipMessage }; @@ -122,7 +127,7 @@ pub const GossipService = struct { ping_cache_rw: RwMux(*PingCache), logger: Logger, thread_pool: *ThreadPool, - echo_server: echo.Server, + echo_server: EchoServer, stats: GossipStats, @@ -175,7 +180,7 @@ pub const GossipService = struct { const failed_pull_hashes = HashTimeQueue.init(allocator); const push_msg_q = ArrayList(SignedGossipData).init(allocator); - const echo_server = echo.Server.init(allocator, gossip_address.port(), exit); + const echo_server = EchoServer.init(allocator, gossip_address.port(), exit); var entrypoint_list = ArrayList(Entrypoint).init(allocator); if (entrypoints) |eps| { @@ -796,10 +801,10 @@ pub const GossipService = struct { if (top_of_loop_ts - last_push_ts > GOSSIP_PULL_TIMEOUT_MS / 2) { // update wallclock and sign self.my_contact_info.wallclock = getWallclockMs(); - const my_contact_info_value = try gossip.SignedGossipData.initSigned(gossip.GossipData{ + const my_contact_info_value = try SignedGossipData.initSigned(GossipData{ .ContactInfo = try self.my_contact_info.clone(), }, &self.my_keypair); - const my_legacy_contact_info_value = try gossip.SignedGossipData.initSigned(gossip.GossipData{ + const my_legacy_contact_info_value = try SignedGossipData.initSigned(GossipData{ .LegacyContactInfo = LegacyContactInfo.fromContactInfo(&self.my_contact_info), }, &self.my_keypair); @@ -887,7 +892,7 @@ pub const GossipService = struct { /// active set and serialized into packets. fn buildPushMessages(self: *Self, push_cursor: *u64) !ArrayList(ArrayList(Packet)) { // TODO: find a better static value? - var buf: [512]gossip.GossipVersionedData = undefined; + var buf: [512]GossipVersionedData = undefined; const gossip_entries = blk: { var gossip_table_lock = self.gossip_table_rw.read(); @@ -1080,7 +1085,7 @@ pub const GossipService = struct { // update wallclock and sign self.my_contact_info.wallclock = now; - const my_contact_info_value = try gossip.SignedGossipData.initSigned(gossip.GossipData{ + const my_contact_info_value = try SignedGossipData.initSigned(GossipData{ .LegacyContactInfo = LegacyContactInfo.fromContactInfo(&self.my_contact_info), }, &self.my_keypair); @@ -1155,7 +1160,7 @@ pub const GossipService = struct { self.allocator, self.gossip_table, self.filter, - gossip.getWallclockMs(), + getWallclockMs(), @as(usize, @max(output_limit, 0)), ) catch { return; @@ -1450,62 +1455,6 @@ pub const GossipService = struct { } } - /// builds a prune message for a list of origin Pubkeys and serializes the values - /// into packets to send to the prune_destination. - fn buildPruneMessage( - self: *Self, - /// origin Pubkeys which will be pruned - failed_origins: *const std.AutoArrayHashMap(Pubkey, void), - /// the pubkey of the node which we will send the prune message to - prune_destination: Pubkey, - ) error{ CantFindContactInfo, InvalidGossipAddress, OutOfMemory, SignatureError }!ArrayList(Packet) { - const from_contact_info = blk: { - var gossip_table_lock = self.gossip_table_rw.read(); - defer gossip_table_lock.unlock(); - - const gossip_table: *const GossipTable = gossip_table_lock.get(); - break :blk gossip_table.getContactInfo(prune_destination) orelse { - return error.CantFindContactInfo; - }; - }; - const from_gossip_addr = from_contact_info.getSocket(.gossip) orelse return error.InvalidGossipAddress; - gossip.sanitizeSocket(&from_gossip_addr) catch return error.InvalidGossipAddress; - const from_gossip_endpoint = from_gossip_addr.toEndpoint(); - - const failed_origin_len = failed_origins.keys().len; - const n_packets = failed_origins.keys().len / MAX_PRUNE_DATA_NODES; - var prune_packets = try ArrayList(Packet).initCapacity(self.allocator, n_packets); - errdefer prune_packets.deinit(); - - const now = getWallclockMs(); - var packet_buf: [PACKET_DATA_SIZE]u8 = undefined; - - var index: usize = 0; - while (true) { - const prune_size = @min(failed_origin_len - index, MAX_PRUNE_DATA_NODES); - if (prune_size == 0) break; - - var prune_data = PruneData.init( - self.my_pubkey, - failed_origins.keys()[index..(prune_size + index)], - prune_destination, - now, - ); - prune_data.sign(&self.my_keypair) catch return error.SignatureError; - - // put it into a packet - const msg = GossipMessage{ .PruneMessage = .{ self.my_pubkey, prune_data } }; - // msg should never be bigger than the PacketSize and serialization shouldnt fail (unrecoverable) - const msg_slice = bincode.writeToSlice(&packet_buf, msg, bincode.Params{}) catch unreachable; - const packet = Packet.init(from_gossip_endpoint, packet_buf, msg_slice.len); - try prune_packets.append(packet); - - index += prune_size; - } - - return prune_packets; - } - pub fn handleBatchPushMessages( self: *Self, batch_push_messages: *const ArrayList(PushMessage), @@ -2222,7 +2171,7 @@ test "gossip.service: tests handling prune messages" { var as_lock = gossip_service.active_set_rw.read(); var as: *const ActiveSet = as_lock.get(); try std.testing.expect(as.len() > 0); // FIX - var iter = as.pruned_peers.keyIterator(); + var iter = as.peers.keyIterator(); const peer0 = iter.next().?.*; as_lock.unlock(); @@ -2244,7 +2193,7 @@ test "gossip.service: tests handling prune messages" { var as_lock2 = gossip_service.active_set_rw.read(); var as2: *const ActiveSet = as_lock2.get(); - try std.testing.expect(as2.pruned_peers.get(peer0).?.contains(&prunes[0].data)); + try std.testing.expect(as2.peers.get(peer0).?.contains(&prunes[0].data)); as_lock2.unlock(); } @@ -2369,7 +2318,7 @@ test "gossip.service: tests handle pull request" { var rando_keypair = try KeyPair.create([_]u8{22} ** 32); const rando_pubkey = Pubkey.fromPublicKey(&rando_keypair.public_key); - var ci_data = gossip.GossipData.randomFromIndex(rng.random(), 0); + var ci_data = GossipData.randomFromIndex(rng.random(), 0); ci_data.LegacyContactInfo.id = rando_pubkey; const gossip_value = try SignedGossipData.initSigned(ci_data, &rando_keypair); @@ -2441,7 +2390,7 @@ test "gossip.service: test build prune messages and handle push messages" { var gossip_socket = SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 20); send_contact_info.gossip = gossip_socket; - const ci_value = try SignedGossipData.initSigned(gossip.GossipData{ + const ci_value = try SignedGossipData.initSigned(GossipData{ .LegacyContactInfo = send_contact_info, }, &my_keypair); var lg = gossip_service.gossip_table_rw.write(); @@ -2641,14 +2590,14 @@ test "gossip.gossip_service: test packet verification" { const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{&gossip_service}); var rng = std.rand.DefaultPrng.init(getWallclockMs()); - var data = gossip.GossipData.randomFromIndex(rng.random(), 0); + var data = GossipData.randomFromIndex(rng.random(), 0); data.LegacyContactInfo.id = id; data.LegacyContactInfo.wallclock = 0; var value = try SignedGossipData.initSigned(data, &keypair); try std.testing.expect(try value.verify(id)); - var values = [_]gossip.SignedGossipData{value}; + var values = [_]SignedGossipData{value}; const message = GossipMessage{ .PushMessage = .{ id, &values }, }; @@ -2668,9 +2617,9 @@ test "gossip.gossip_service: test packet verification" { var packet_batch_2 = ArrayList(Packet).init(allocator); // send one which fails sanitization - var value_v2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 2), &keypair); - value_v2.data.EpochSlots[0] = gossip.MAX_EPOCH_SLOTS; - var values_v2 = [_]gossip.SignedGossipData{value_v2}; + var value_v2 = try SignedGossipData.initSigned(GossipData.randomFromIndex(rng.random(), 2), &keypair); + value_v2.data.EpochSlots[0] = sig.gossip.data.MAX_EPOCH_SLOTS; + var values_v2 = [_]SignedGossipData{value_v2}; const message_v2 = GossipMessage{ .PushMessage = .{ id, &values_v2 }, }; @@ -2681,8 +2630,8 @@ test "gossip.gossip_service: test packet verification" { // send one with a incorrect signature var rand_keypair = try KeyPair.create([_]u8{3} ** 32); - const value2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 0), &rand_keypair); - var values2 = [_]gossip.SignedGossipData{value2}; + const value2 = try SignedGossipData.initSigned(GossipData.randomFromIndex(rng.random(), 0), &rand_keypair); + var values2 = [_]SignedGossipData{value2}; const message2 = GossipMessage{ .PushMessage = .{ id, &values2 }, }; @@ -2694,21 +2643,21 @@ test "gossip.gossip_service: test packet verification" { // send it with a SignedGossipData which hash a slice { const rand_pubkey = Pubkey.fromPublicKey(&rand_keypair.public_key); - var dshred = gossip.DuplicateShred.random(rng.random()); + var dshred = sig.gossip.data.DuplicateShred.random(rng.random()); var chunk: [32]u8 = .{1} ** 32; dshred.chunk = &chunk; dshred.wallclock = 1714155765121; dshred.slot = 16592333628234015598; dshred.shred_index = 3853562894; - dshred.shred_type = gossip.ShredType.Data; + dshred.shred_type = sig.gossip.data.ShredType.Data; dshred.num_chunks = 99; dshred.chunk_index = 69; dshred.from = rand_pubkey; - const dshred_data = gossip.GossipData{ + const dshred_data = GossipData{ .DuplicateShred = .{ 1, dshred }, }; const dshred_value = try SignedGossipData.initSigned(dshred_data, &rand_keypair); - var values3 = [_]gossip.SignedGossipData{dshred_value}; + var values3 = [_]SignedGossipData{dshred_value}; const message3 = GossipMessage{ .PushMessage = .{ id, &values3 }, }; @@ -2793,11 +2742,11 @@ test "gossip.gossip_service: process contact info push packet" { // new contact info const legacy_contact_info = LegacyContactInfo.default(id); - const gossip_data = gossip.GossipData{ + const gossip_data = GossipData{ .LegacyContactInfo = legacy_contact_info, }; - const gossip_value = try gossip.SignedGossipData.initSigned(gossip_data, &kp); - const heap_values = try gossip_value_allocator.dupe(gossip.SignedGossipData, &.{gossip_value}); + const gossip_value = try SignedGossipData.initSigned(gossip_data, &kp); + const heap_values = try gossip_value_allocator.dupe(SignedGossipData, &.{gossip_value}); const msg = GossipMessage{ .PushMessage = .{ id, heap_values }, }; diff --git a/src/gossip/table.zig b/src/gossip/table.zig index ca6b6a6a0..872ee7c68 100644 --- a/src/gossip/table.zig +++ b/src/gossip/table.zig @@ -1,29 +1,31 @@ const std = @import("std"); +const sig = @import("../lib.zig"); + +const bincode = sig.bincode; + const AutoArrayHashMap = std.AutoArrayHashMap; const AutoHashMap = std.AutoHashMap; -const bincode = @import("../bincode/bincode.zig"); -const Hash = @import("../core/hash.zig").Hash; -const GossipTableShards = @import("./shards.zig").GossipTableShards; -const _gossip_data = @import("data.zig"); -const SignedGossipData = _gossip_data.SignedGossipData; -const GossipData = _gossip_data.GossipData; -const GossipVersionedData = _gossip_data.GossipVersionedData; -const GossipKey = _gossip_data.GossipKey; -const LegacyContactInfo = _gossip_data.LegacyContactInfo; -const ContactInfo = _gossip_data.ContactInfo; -const getWallclockMs = _gossip_data.getWallclockMs; -const Vote = _gossip_data.Vote; - -const ThreadPool = @import("../sync/thread_pool.zig").ThreadPool; -const Task = ThreadPool.Task; -const Batch = ThreadPool.Batch; - -const Transaction = @import("../core/transaction.zig").Transaction; -const Pubkey = @import("../core/pubkey.zig").Pubkey; const KeyPair = std.crypto.sign.Ed25519.KeyPair; -const SocketAddr = @import("../net/net.zig").SocketAddr; -const PACKET_DATA_SIZE = @import("../net/packet.zig").PACKET_DATA_SIZE; +const GossipTableShards = sig.gossip.shards.GossipTableShards; +const SignedGossipData = sig.gossip.data.SignedGossipData; +const GossipData = sig.gossip.data.GossipData; +const GossipVersionedData = sig.gossip.data.GossipVersionedData; +const GossipKey = sig.gossip.data.GossipKey; +const LegacyContactInfo = sig.gossip.data.LegacyContactInfo; +const ContactInfo = sig.gossip.data.ContactInfo; +const Vote = sig.gossip.data.Vote; +const ThreadPool = sig.sync.ThreadPool; +const Task = sig.sync.ThreadPool.Task; +const Batch = sig.sync.ThreadPool.Batch; +const Hash = sig.core.hash.Hash; +const Transaction = sig.core.transaction.Transaction; +const Pubkey = sig.core.Pubkey; +const SocketAddr = sig.net.SocketAddr; + +const getWallclockMs = sig.gossip.data.getWallclockMs; + +const PACKET_DATA_SIZE = sig.net.packet.PACKET_DATA_SIZE; pub const UNIQUE_PUBKEY_CAPACITY: usize = 8192; pub const MAX_TABLE_SIZE: usize = 1_000_000; // TODO: better value for this @@ -698,11 +700,10 @@ pub const GossipTable = struct { bincode.free(self.allocator, versioned_value.value.data); } + /// Trim when over 90% of max capacity pub fn shouldTrim(self: *const Self, max_pubkey_capacity: usize) bool { const n_pubkeys = self.pubkey_to_values.count(); - // 90% close to capacity - const should_trim = 10 * n_pubkeys > 11 * max_pubkey_capacity; - return should_trim; + return (10 * n_pubkeys > 9 * max_pubkey_capacity); } pub fn attemptTrim(self: *Self, max_pubkey_capacity: usize) error{OutOfMemory}!void { diff --git a/src/lib.zig b/src/lib.zig index 28889d0c3..93ec05e3c 100644 --- a/src/lib.zig +++ b/src/lib.zig @@ -2,6 +2,7 @@ pub const accounts_db = @import("accountsdb/lib.zig"); pub const bincode = @import("bincode/bincode.zig"); pub const bloom = @import("bloom/lib.zig"); pub const core = @import("core/lib.zig"); +pub const crypto = @import("crypto/fnv.zig"); pub const gossip = @import("gossip/lib.zig"); pub const net = @import("net/lib.zig"); pub const prometheus = @import("prometheus/lib.zig"); diff --git a/src/version/version.zig b/src/version/version.zig index bca877f78..8566475a3 100644 --- a/src/version/version.zig +++ b/src/version/version.zig @@ -1,4 +1,6 @@ -const var_int_config_u16 = @import("../utils/varint.zig").var_int_config_u16; +const sig = @import("../lib.zig"); + +const var_int_config_u16 = sig.bincode.varint.var_int_config_u16; pub const CURRENT_CLIENT_VERSION: ClientVersion = ClientVersion.new(0, 1, 0, 0, 0, 4);