diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 4191e9dac..7c3d2771e 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -9,6 +9,7 @@ const Level = @import("../trace/level.zig").Level; const io = std.io; const Pubkey = @import("../core/pubkey.zig").Pubkey; const SocketAddr = @import("../net/net.zig").SocketAddr; +const echo = @import("../net/echo.zig"); const GossipService = @import("../gossip/gossip_service.zig").GossipService; const servePrometheus = @import("../prometheus/http.zig").servePrometheus; const globalRegistry = @import("../prometheus/registry.zig").globalRegistry; @@ -115,12 +116,11 @@ fn gossip(_: []const []const u8) !void { var gossip_port: u16 = @intCast(gossip_port_option.value.int.?); var gossip_address = SocketAddr.initIpv4(.{ 0, 0, 0, 0 }, gossip_port); - logger.infof("gossip port: {d}\n", .{gossip_port}); + logger.infof("gossip port: {d}", .{gossip_port}); // setup contact info var my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key, false); var contact_info = LegacyContactInfo.default(my_pubkey); - contact_info.shred_version = 0; contact_info.gossip = gossip_address; var entrypoints = std.ArrayList(SocketAddr).init(gpa_allocator); @@ -134,7 +134,35 @@ fn gossip(_: []const []const u8) !void { try entrypoints.append(value); } } - std.debug.print("entrypoints: {any}\n", .{entrypoints.items}); + + // log entrypoints + var entrypoint_string = try gpa_allocator.alloc(u8, 53 * entrypoints.items.len); + defer gpa_allocator.free(entrypoint_string); + var stream = std.io.fixedBufferStream(entrypoint_string); + var writer = stream.writer(); + for (0.., entrypoints.items) |i, entrypoint| { + try entrypoint.toAddress().format("", .{}, writer); + if (i != entrypoints.items.len - 1) try writer.writeAll(", "); + } + logger.infof("entrypoints: {s}", .{entrypoint_string[0..stream.pos]}); + + // determine our shred version. in the solana-labs client, this approach is only + // used for validation. normally, shred version comes from the snapshot. + contact_info.shred_version = loop: for (entrypoints.items) |entrypoint| { + if (echo.requestIpEcho(gpa_allocator, entrypoint.toAddress(), .{})) |response| { + if (response.shred_version) |shred_version| { + var addr_str = entrypoint.toString(); + logger.infof( + "shred version: {} - from entrypoint ip echo: {s}", + .{ shred_version.value, addr_str[0][0..addr_str[1]] }, + ); + break shred_version.value; + } + } else |_| {} + } else { + logger.warn("could not get a shred version from an entrypoint"); + break :loop 0; + }; var exit = std.atomic.Atomic(bool).init(false); var gossip_service = try GossipService.init( @@ -163,7 +191,7 @@ fn gossip(_: []const []const u8) !void { /// Uses same allocator for both registry and http adapter. fn spawnMetrics(allocator: std.mem.Allocator, logger: Logger) !std.Thread { var metrics_port: u16 = @intCast(metrics_port_option.value.int.?); - logger.infof("metrics port: {d}\n", .{metrics_port}); + logger.infof("metrics port: {d}", .{metrics_port}); const registry = globalRegistry(); return try std.Thread.spawn(.{}, servePrometheus, .{ allocator, registry, metrics_port }); } diff --git a/src/gossip/crds_table.zig b/src/gossip/crds_table.zig index d680ab2a5..e352d056b 100644 --- a/src/gossip/crds_table.zig +++ b/src/gossip/crds_table.zig @@ -25,6 +25,7 @@ const Transaction = @import("../core/transaction.zig").Transaction; const Pubkey = @import("../core/pubkey.zig").Pubkey; const KeyPair = std.crypto.sign.Ed25519.KeyPair; const RwLock = std.Thread.RwLock; +const SocketAddr = @import("../net/net.zig").SocketAddr; const PACKET_DATA_SIZE = @import("./packet.zig").PACKET_DATA_SIZE; @@ -432,7 +433,20 @@ pub const CrdsTable = struct { return false; } - // ** triming values in the crdstable ** + /// ** triming values in the crdstable ** + /// + /// This frees the memory for any pointers in the CrdsData. + /// Be sure that this CrdsData is not being used anywhere else when calling this. + /// + /// This method is not safe because neither CrdsTable nor CrdsValue + /// provide any guarantees that the CrdsValue being removed is not + /// also being accessed somewhere else in the code after this is called. + /// Since this frees the CrdsValue, any accesses of the CrdsValue + /// after this function is called will result in a segfault. + /// + /// TODO: implement a safer approach to avoid dangling pointers, such as: + /// - removal buffer that is populated here and freed later + /// - reference counting for all crds values pub fn remove(self: *Self, label: CrdsValueLabel) error{ LabelNotFound, OutOfMemory }!void { const now = crds.getWallclockMs(); @@ -536,6 +550,7 @@ pub const CrdsTable = struct { std.debug.assert(did_remove); new_entry_indexs.put(entry_index, {}) catch unreachable; } + bincode.free(self.allocator, versioned_value.value.data); } pub fn attemptTrim(self: *Self, max_pubkey_capacity: usize) error{OutOfMemory}!void { @@ -673,6 +688,20 @@ pub const CrdsTable = struct { return output; } + + pub fn getContactInfoByGossipAddr( + self: *const Self, + gossip_addr: SocketAddr, + ) ?LegacyContactInfo { + var contact_indexs = self.contact_infos.keys(); + for (contact_indexs) |index| { + const entry: CrdsVersionedValue = self.store.values()[index]; + if (entry.value.data.LegacyContactInfo.gossip.eql(&gossip_addr)) { + return entry.value.data.LegacyContactInfo; + } + } + return null; + } }; pub const HashTimeQueue = struct { diff --git a/src/gossip/gossip_service.zig b/src/gossip/gossip_service.zig index 39803fd3b..190ea55b3 100644 --- a/src/gossip/gossip_service.zig +++ b/src/gossip/gossip_service.zig @@ -93,6 +93,9 @@ pub const MAX_NUM_CRDS_VALUES_PULL_RESPONSE = 20; // TODO: this is approx the ru pub const MAX_PRUNE_DATA_NODES: usize = 32; pub const NUM_ACTIVE_SET_ENTRIES: usize = 25; +// TODO: replace with get_epoch_duration when BankForks is supported +const DEFAULT_EPOCH_DURATION: u64 = 172800000; + const Config = struct { mode: enum { normal, tests, bench } = .normal }; pub const GossipService = struct { @@ -100,10 +103,12 @@ pub const GossipService = struct { // note: this contact info should not change gossip_socket: UdpSocket, + /// This contact info is mutated by the buildMessages thread, so it must + /// only be read by that thread, or it needs a synchronization mechanism. my_contact_info: LegacyContactInfo, my_keypair: KeyPair, my_pubkey: Pubkey, - my_shred_version: u16, + my_shred_version: std.atomic.Atomic(u16), exit: *AtomicBool, // communication between threads @@ -118,7 +123,9 @@ pub const GossipService = struct { // pull message things failed_pull_hashes_mux: Mux(HashTimeQueue), - entrypoints: ArrayList(SocketAddr), + /// This contact info is mutated by the buildMessages thread, so it must + /// only be read by that thread, or it needs a synchronization mechanism. + entrypoints: ArrayList(Entrypoint), ping_cache_rw: RwMux(PingCache), logger: Logger, thread_pool: *ThreadPool, @@ -127,6 +134,8 @@ pub const GossipService = struct { // used for benchmarking messages_processed: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), + const Entrypoint = struct { addr: SocketAddr, info: ?LegacyContactInfo = null }; + const Self = @This(); pub fn init( @@ -173,11 +182,17 @@ pub const GossipService = struct { var echo_server = echo.Server.init(allocator, my_contact_info.gossip.port(), logger, exit); + var entrypoint_list = ArrayList(Entrypoint).init(allocator); + if (entrypoints) |eps| { + try entrypoint_list.ensureTotalCapacityPrecise(eps.items.len); + for (eps.items) |ep| entrypoint_list.appendAssumeCapacity(.{ .addr = ep }); + } + return Self{ .my_contact_info = my_contact_info, .my_keypair = my_keypair, .my_pubkey = my_pubkey, - .my_shred_version = my_shred_version, + .my_shred_version = std.atomic.Atomic(u16).init(my_shred_version), .gossip_socket = gossip_socket, .exit = exit, .packet_incoming_channel = packet_incoming_channel, @@ -188,7 +203,7 @@ pub const GossipService = struct { .push_msg_queue_mux = Mux(ArrayList(CrdsValue)).init(push_msg_q), .active_set_rw = RwMux(ActiveSet).init(active_set), .failed_pull_hashes_mux = Mux(HashTimeQueue).init(failed_pull_hashes), - .entrypoints = entrypoints orelse ArrayList(SocketAddr).init(allocator), + .entrypoints = entrypoint_list, .ping_cache_rw = RwMux(PingCache).init( try PingCache.init( allocator, @@ -477,7 +492,32 @@ pub const GossipService = struct { const protocol_messages = maybe_protocol_messages.?; defer { for (protocol_messages) |*msg| { - bincode.free(self.allocator, msg.message); + // Important: this uses shallowFree instead of bincode.free + // + // The message contains some messaging metadata plus a + // payload of a CrdsValue. The metadata won't be needed + // after this iteration is complete. The payload will be + // needed since it is stored in the CrdsTable. + // + // bincode.free would free the entire message including the + // payload. This would lead to a segfault if the data is + // accessed from the CrdsTable later. + // + // Not freeing at all would lead to a memory leak of any + // allocations in the metadata. + // + // The compromise is a "shallow" free that only frees the + // messaging metadata. CrdsValue ownership will be + // transferred to CrdsTable. The CrdsTable implementation + // becomes responsible for freeing any CrdsValues when + // needed. + // + // TODO: this approach is not ideal because it is difficult + // to maintain. Another approach such as reference counting + // would be safer. For more info, see: + // - CrdsTable.remove + // - https://github.com/Syndica/sig/pull/69 + msg.message.shallowFree(self.allocator); } self.verified_incoming_channel.allocator.free(protocol_messages); } @@ -510,7 +550,7 @@ pub const GossipService = struct { continue; } // Allow spy nodes with shred-verion == 0 to pull from other nodes. - if (data.shred_version != 0 and data.shred_version != self.my_shred_version) { + if (data.shred_version != 0 and data.shred_version != self.my_shred_version.load(.Monotonic)) { // non-matching shred version continue; } @@ -657,6 +697,8 @@ pub const GossipService = struct { var last_push_ts: u64 = 0; var push_cursor: u64 = 0; var should_send_pull_requests = true; + var entrypoints_identified = false; + var shred_version_assigned = false; while (!self.exit.load(std.atomic.Ordering.Unordered)) { const top_of_loop_ts = getWallclockMs(); @@ -688,6 +730,10 @@ pub const GossipService = struct { // trim data self.trimMemory(getWallclockMs()) catch @panic("out of memory"); + // initialize cluster data from crds values + entrypoints_identified = entrypoints_identified or self.populateEntrypointsFromCrdsTable(); + shred_version_assigned = shred_version_assigned or self.assignDefaultShredVersionFromEntrypoint(); + // periodic things if (top_of_loop_ts - last_push_ts > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2) { // update wallclock and sign @@ -716,7 +762,7 @@ pub const GossipService = struct { std.time.sleep(time_left_ms * std.time.ns_per_ms); } } - self.logger.infof("build_messages loop closed\n", .{}); + self.logger.infof("build_messages loop closed", .{}); } pub fn rotateActiveSet( @@ -887,21 +933,10 @@ pub const GossipService = struct { var rng = std.rand.DefaultPrng.init(now); var entrypoint_index: i16 = -1; if (self.entrypoints.items.len != 0) blk: { - var crds_table_lg = self.crds_table_rw.read(); - defer crds_table_lg.unlock(); - var maybe_entrypoint_index = rng.random().intRangeAtMost(usize, 0, self.entrypoints.items.len - 1); - const entrypoint = self.entrypoints.items[maybe_entrypoint_index]; - - const crds_table: *const CrdsTable = crds_table_lg.get(); - const contact_infos = try crds_table.getAllContactInfos(); - defer contact_infos.deinit(); - - for (contact_infos.items) |contact_info| { - if (contact_info.gossip.eql(&entrypoint)) { - // early exit - we already have the peers in our contact info - break :blk; - } + if (self.entrypoints.items[maybe_entrypoint_index].info) |_| { + // early exit - we already have the peer in our contact info + break :blk; } // we dont have them so well add them to the peer list (as default contact info) entrypoint_index = @intCast(maybe_entrypoint_index); @@ -986,14 +1021,14 @@ pub const GossipService = struct { // append entrypoint msgs if (should_send_to_entrypoint) { - const entrypoint_addr = self.entrypoints.items[@as(usize, @intCast(entrypoint_index))]; + const entrypoint = self.entrypoints.items[@as(usize, @intCast(entrypoint_index))]; for (filters.items) |filter| { const protocol_msg = Protocol{ .PullRequest = .{ filter, my_contact_info_value } }; var packet = &packet_batch.items[packet_index]; var bytes = try bincode.writeToSlice(&packet.data, protocol_msg, bincode.Params{}); packet.size = bytes.len; - packet.addr = entrypoint_addr.toEndpoint(); + packet.addr = entrypoint.addr.toEndpoint(); packet_index += 1; } } @@ -1515,7 +1550,13 @@ pub const GossipService = struct { try crds_table.purged.trim(purged_cutoff_timestamp); try crds_table.attemptTrim(CRDS_UNIQUE_PUBKEY_CAPACITY); - try crds_table.removeOldLabels(now, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + + // TODO: condition timeout on stake weight: + // - values from nodes with non-zero stake: epoch duration + // - values from nodes with zero stake: + // - if all nodes have zero stake: epoch duration + // - if any other nodes have non-zero stake: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS (15s) + try crds_table.removeOldLabels(now, DEFAULT_EPOCH_DURATION); } const failed_insert_cutoff_timestamp = now -| FAILED_INSERTS_RETENTION_MS; @@ -1528,6 +1569,47 @@ pub const GossipService = struct { } } + /// Attempts to associate each entrypoint address with a contact info. + /// Returns true if all entrypoints have been identified + /// + /// Acquires the crds table lock regardless of whether the crds table is used. + fn populateEntrypointsFromCrdsTable(self: *Self) bool { + var identified_all = true; + + var crds_table_lock = self.crds_table_rw.read(); + defer crds_table_lock.unlock(); + var crds_table: *const CrdsTable = crds_table_lock.get(); + + for (self.entrypoints.items) |*entrypoint| { + if (entrypoint.info == null) { + entrypoint.info = crds_table.getContactInfoByGossipAddr(entrypoint.addr); + } + identified_all = identified_all and entrypoint.info != null; + } + return identified_all; + } + + /// if we have no shred version, attempt to get one from an entrypoint. + /// Returns true if the shred version is set to non-zero + fn assignDefaultShredVersionFromEntrypoint(self: *Self) bool { + if (self.my_shred_version.load(.Monotonic) != 0) return true; + for (self.entrypoints.items) |entrypoint| { + if (entrypoint.info) |info| { + if (info.shred_version != 0) { + var addr_str = entrypoint.addr.toString(); + self.logger.infof( + "shred version: {} - from entrypoint contact info: {s}", + .{ info.shred_version, addr_str[0][0..addr_str[1]] }, + ); + self.my_shred_version.store(info.shred_version, .Monotonic); + self.my_contact_info.shred_version = info.shred_version; + return true; + } + } + } + return false; + } + /// drains values from the push queue and inserts them into the crds table. /// when inserting values in the crds table, any errors are ignored. fn drainPushQueueToCrdsTable( @@ -1618,7 +1700,8 @@ pub const GossipService = struct { continue; } // filter matching shred version or my_shred_version == 0 - if (self.my_shred_version != 0 and self.my_shred_version != peer_info.shred_version) { + const my_shred_version = self.my_shred_version.load(.Monotonic); + if (my_shred_version != 0 and my_shred_version != peer_info.shred_version) { continue; } // filter on valid gossip address @@ -1644,17 +1727,22 @@ pub const GossipService = struct { // we use swap remove which just reorders the array // (order dm), so we just track the new len -- ie, no allocations/frees var crds_values_array = ArrayList(CrdsValue).fromOwnedSlice(self.allocator, crds_values); - if (crds_table.check_matching_shred_version(from_pubkey, self.my_shred_version)) { + const my_shred_version = self.my_shred_version.load(.Monotonic); + if (my_shred_version == 0) { + return crds_values_array.items.len; + } + if (crds_table.check_matching_shred_version(from_pubkey, my_shred_version)) { for (crds_values, 0..) |*crds_value, i| { switch (crds_value.data) { // always allow contact info + node instance to update shred versions + .ContactInfo => {}, .LegacyContactInfo => {}, .NodeInstance => {}, else => { // only allow other values with matching shred versions if (!crds_table.check_matching_shred_version( crds_value.id(), - self.my_shred_version, + my_shred_version, )) { _ = crds_values_array.swapRemove(i); } @@ -1665,6 +1753,7 @@ pub const GossipService = struct { for (crds_values, 0..) |*crds_value, i| { switch (crds_value.data) { // always allow contact info + node instance to update shred versions + .ContactInfo => {}, .LegacyContactInfo => {}, .NodeInstance => {}, else => { diff --git a/src/gossip/protocol.zig b/src/gossip/protocol.zig index 651565f21..8104277a0 100644 --- a/src/gossip/protocol.zig +++ b/src/gossip/protocol.zig @@ -102,6 +102,30 @@ pub const Protocol = union(enum(u32)) { .PongMessage => {}, } } + + /// Frees the ephemeral messaging data that is only needed + /// for the initial processing of an incoming message. + /// + /// Does not free the contained crds data that + /// needs to be stored in the crds table. + pub fn shallowFree(self: *Protocol, allocator: std.mem.Allocator) void { + switch (self.*) { + .PullRequest => |*msg| { + msg[0].deinit(); + }, + .PullResponse => |*msg| { + allocator.free(msg[1]); + }, + .PushMessage => |*msg| { + allocator.free(msg[1]); + }, + .PruneMessage => |*msg| { + allocator.free(msg[1].prunes); + }, + .PingMessage => {}, + .PongMessage => {}, + } + } }; pub fn sanitizeWallclock(wallclock: u64) !void { diff --git a/src/net/echo.zig b/src/net/echo.zig index 5c21a5ce3..84f8b3676 100644 --- a/src/net/echo.zig +++ b/src/net/echo.zig @@ -9,23 +9,22 @@ const Atomic = std.atomic.Atomic; const assert = std.debug.assert; const testing = std.testing; const http = std.http; +const bincode = @import("../bincode/bincode.zig"); const MAX_PORT_COUNT_PER_MSG: usize = 4; const MAX_REQ_HEADER_SIZE = 8192; const SERVER_LISTENER_LINGERING_TIMEOUT: u64 = std.time.ns_per_s * 1; +const HEADER_LENGTH: usize = 4; const IpEchoServerMessage = struct { - tcp_ports: [MAX_PORT_COUNT_PER_MSG]u16, - udp_ports: [MAX_PORT_COUNT_PER_MSG]u16, + tcp_ports: [MAX_PORT_COUNT_PER_MSG]u16 = [_]u16{0} ** MAX_PORT_COUNT_PER_MSG, + udp_ports: [MAX_PORT_COUNT_PER_MSG]u16 = [_]u16{0} ** MAX_PORT_COUNT_PER_MSG, const Self = @This(); pub fn init(tcp_ports: []u16, udp_ports: []u16) Self { assert(tcp_ports.len <= MAX_PORT_COUNT_PER_MSG and udp_ports.len <= MAX_PORT_COUNT_PER_MSG); - var self = Self{ - .tcp_ports = [_]u16{0} ** MAX_PORT_COUNT_PER_MSG, - .udp_ports = [_]u16{0} ** MAX_PORT_COUNT_PER_MSG, - }; + var self = Self{}; std.mem.copyForwards(u16, &self.tcp_ports, tcp_ports); std.mem.copyForwards(u16, &self.udp_ports, udp_ports); @@ -301,6 +300,25 @@ pub fn handleRequest( logger.debug("done handling request"); } +pub fn requestIpEcho( + allocator: std.mem.Allocator, + addr: std.net.Address, + message: IpEchoServerMessage, +) !IpEchoServerResponse { + // connect + send + const conn = try std.net.tcpConnectToAddress(addr); + defer conn.close(); + try conn.writeAll(&(.{0} ** HEADER_LENGTH)); + try bincode.write(allocator, conn.writer(), message, .{}); + try conn.writeAll("\n"); + + // get response + var buff: [32]u8 = undefined; + const len = try conn.readAll(&buff); + var bufferStream = std.io.fixedBufferStream(buff[HEADER_LENGTH..len]); + return try bincode.read(allocator, IpEchoServerResponse, bufferStream.reader(), .{}); +} + test "net.echo: Server works" { const port: u16 = 34333; diff --git a/src/net/net.zig b/src/net/net.zig index b768f91f4..f331700d8 100644 --- a/src/net/net.zig +++ b/src/net/net.zig @@ -197,6 +197,23 @@ pub const SocketAddr = union(enum(u8)) { } } + pub fn toAddress(self: Self) std.net.Address { + return switch (self) { + .V4 => |a| std.net.Address.initIp4(a.ip.octets, a.port), + .V6 => |a| std.net.Address.initIp6(a.ip.octets, a.port, a.flowinfo, a.scope_id), + }; + } + + /// returns: + /// - array: the string, plus some extra bytes at the end + /// - integer: length of the string within the array + pub fn toString(self: Self) struct { [53]u8, usize } { + var buf: [53]u8 = undefined; + var stream = std.io.fixedBufferStream(&buf); + self.toAddress().format("", .{}, stream.writer()) catch unreachable; + return .{ buf, stream.pos }; + } + pub fn isUnspecified(self: *const Self) bool { switch (self.*) { .V4 => |addr| {