diff --git a/src/gossip/fuzz.zig b/src/gossip/fuzz.zig index e37823849..ca88bae43 100644 --- a/src/gossip/fuzz.zig +++ b/src/gossip/fuzz.zig @@ -380,7 +380,9 @@ pub fn run() !void { }; // batch it - var packet_batch = std.ArrayList(Packet).init(allocator); + var packet_batch = std.ArrayList(Packet).init(gossip_service_fuzzer.allocator); + defer packet_batch.deinit(); + try packet_batch.append(send_packet); msg_count +|= 1; @@ -391,7 +393,7 @@ pub fn run() !void { } // send it - try gossip_service_fuzzer.packet_outgoing_channel.send(packet_batch); + try gossip_service_fuzzer.packet_outgoing_channel.send(packet_batch.moveToUnmanaged()); std.time.sleep(SLEEP_TIME); diff --git a/src/gossip/ping_pong.zig b/src/gossip/ping_pong.zig index 85c1f41e1..ad5ad7bd7 100644 --- a/src/gossip/ping_pong.zig +++ b/src/gossip/ping_pong.zig @@ -209,31 +209,38 @@ pub const PingCache = struct { } /// Filters valid peers according to `PingCache` state and returns them along with any possible pings that need to be sent out. - /// - /// *Note*: caller is responsible for deinit `ArrayList`(s) returned! pub fn filterValidPeers( self: *Self, - allocator: std.mem.Allocator, our_keypair: KeyPair, - peers: []ContactInfo, - ) error{OutOfMemory}!struct { valid_peers: std.ArrayList(usize), pings: std.ArrayList(PingAndSocketAddr) } { + peers: []const ContactInfo, + out: struct { + valid_peers: *std.ArrayList(usize), + pings: *std.ArrayList(PingAndSocketAddr), + }, + ) error{OutOfMemory}!void { const now = std.time.Instant.now() catch @panic("time not supported by OS!"); - var valid_peers = std.ArrayList(usize).init(allocator); - var pings = std.ArrayList(PingAndSocketAddr).init(allocator); + // var valid_peers = std.ArrayList(usize).init(allocator); + // var pings = std.ArrayList(PingAndSocketAddr).init(allocator); + const valid_peers = out.valid_peers; + const pings = out.pings; + + valid_peers.clearRetainingCapacity(); + pings.clearRetainingCapacity(); + + try valid_peers.ensureTotalCapacityPrecise(peers.len); + try pings.ensureTotalCapacityPrecise(peers.len); for (peers, 0..) |*peer, i| { if (peer.getSocket(socket_tag.GOSSIP)) |gossip_addr| { const result = self.check(now, PubkeyAndSocketAddr{ .pubkey = peer.pubkey, .socket_addr = gossip_addr }, &our_keypair); if (result.passes_ping_check) { - try valid_peers.append(i); + valid_peers.appendAssumeCapacity(i); } if (result.maybe_ping) |ping| { - try pings.append(.{ .ping = ping, .socket = gossip_addr }); + pings.appendAssumeCapacity(.{ .ping = ping, .socket = gossip_addr }); } } } - - return .{ .valid_peers = valid_peers, .pings = pings }; } // only used in tests/benchmarks @@ -269,9 +276,16 @@ test "gossip.ping_pong: PingCache works" { try testing.expect(!resp.passes_ping_check); try testing.expect(resp.maybe_ping != null); - var result = try ping_cache.filterValidPeers(testing.allocator, our_kp, &[_]ContactInfo{}); - defer result.valid_peers.deinit(); - defer result.pings.deinit(); + var valid_peers = std.ArrayList(usize).init(std.testing.allocator); + defer valid_peers.deinit(); + + var pings = std.ArrayList(PingAndSocketAddr).init(std.testing.allocator); + defer pings.deinit(); + + try ping_cache.filterValidPeers(our_kp, &[_]ContactInfo{}, .{ + .valid_peers = &valid_peers, + .pings = &pings, + }); try testing.expect(ping != null); } diff --git a/src/gossip/service.zig b/src/gossip/service.zig index f8e5d75fe..6ce267bda 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -54,7 +54,7 @@ const globalRegistry = @import("../prometheus/registry.zig").globalRegistry; const GetMetricError = @import("../prometheus/registry.zig").GetMetricError; const Counter = @import("../prometheus/counter.zig").Counter; -const PacketBatch = ArrayList(Packet); +const PacketBatch = std.ArrayListUnmanaged(Packet); const GossipMessageWithEndpoint = struct { from_endpoint: EndPoint, message: GossipMessage }; pub const GOSSIP_PULL_RATE_MS: u64 = 5 * std.time.ms_per_s; @@ -239,14 +239,14 @@ pub const GossipService = struct { { var buff_lock = self.packet_incoming_channel.buffer.lock(); const buff: *std.ArrayList(PacketBatch) = buff_lock.mut(); - for (buff.items) |*item| item.deinit(); + for (buff.items) |*item| item.deinit(self.allocator); buff_lock.unlock(); self.packet_incoming_channel.deinit(); } { var buff_lock = self.packet_outgoing_channel.buffer.lock(); const buff: *std.ArrayList(PacketBatch) = buff_lock.mut(); - for (buff.items) |*item| item.deinit(); + for (buff.items) |*item| item.deinit(self.allocator); buff_lock.unlock(); self.packet_outgoing_channel.deinit(); } @@ -286,7 +286,9 @@ pub const GossipService = struct { const maybe_thread: ?std.Thread = @field(handles, field.name); const thread = maybe_thread orelse break :cont; thread.join(); // if we end up joining, something's gone wrong, so signal exit - if (i == 0) handles.exit.store(true, .unordered); + if (i == 0) { + handles.exit.store(true, .unordered); + } } } }; @@ -324,8 +326,8 @@ pub const GossipService = struct { }.exitAndJoin; const receiver_thread = try Thread.spawn(.{}, socket_utils.readSocket, .{ - self.allocator, self.gossip_socket, + self.allocator, self.packet_incoming_channel, self.exit, self.logger, @@ -345,6 +347,7 @@ pub const GossipService = struct { const responder_thread = try Thread.spawn(.{}, socket_utils.sendSocket, .{ self.gossip_socket, + self.allocator, self.packet_outgoing_channel, self.exit, self.logger, @@ -455,12 +458,12 @@ pub const GossipService = struct { // verify in parallel using the threadpool // PERF: investigate CPU pinning var task_search_start_idx: usize = 0; - for (packet_batches) |packet_batch| { + for (packet_batches) |*packet_batch| { const acquired_task_idx = VerifyMessageTask.awaitAndAcquireFirstAvailableTask(tasks, task_search_start_idx); task_search_start_idx = (acquired_task_idx + 1) % tasks.len; const task_ptr = &tasks[acquired_task_idx]; - task_ptr.entry.packet_batch = packet_batch; + task_ptr.entry.packet_batch = packet_batch.toManaged(self.allocator); task_ptr.result catch |err| self.logger.errf("VerifyMessageTask encountered error: {s}", .{@errorName(err)}); const batch = Batch.from(&task_ptr.task); @@ -743,7 +746,7 @@ pub const GossipService = struct { if (ping_messages.items.len > 0) { var x_timer = std.time.Timer.start() catch unreachable; - self.handleBatchPingMessages(&ping_messages) catch |err| { + self.handleBatchPingMessages(ping_messages.items) catch |err| { self.logger.errf("handleBatchPingMessages failed: {}", .{err}); }; const elapsed = x_timer.read(); @@ -811,26 +814,24 @@ pub const GossipService = struct { if (top_of_loop_ts - last_pull_req_ts > GOSSIP_PULL_RATE_MS) pull_blk: { defer last_pull_req_ts = getWallclockMs(); // this also includes sending ping messages to other peers - const packets = self.buildPullRequests( + var packets = self.buildPullRequests( pull_request.MAX_BLOOM_SIZE, ) catch |e| { self.logger.errf("failed to generate pull requests: {any}", .{e}); break :pull_blk; }; self.stats.pull_requests_sent.add(packets.items.len); - try self.packet_outgoing_channel.send(packets); + try self.packet_outgoing_channel.send(packets.moveToUnmanaged()); } // new push msgs self.drainPushQueueToGossipTable(getWallclockMs()); - const maybe_push_packets = self.buildPushMessages(&push_cursor) catch |e| blk: { - self.logger.errf("failed to generate push messages: {any}", .{e}); - break :blk null; - }; - if (maybe_push_packets) |push_packets| { + if (self.buildPushMessages(self.allocator, &push_cursor)) |push_packets| { + defer push_packets.deinit(); self.stats.push_messages_sent.add(push_packets.items.len); - try self.packet_outgoing_channel.sendBatch(push_packets); - push_packets.deinit(); + try self.packet_outgoing_channel.sendBatch(push_packets.items); + } else |e| { + self.logger.errf("failed to generate push messages: {any}", .{e}); } // trim data @@ -900,17 +901,22 @@ pub const GossipService = struct { var buf: [NUM_ACTIVE_SET_ENTRIES]ContactInfo = undefined; const gossip_peers = try self.getGossipNodes(&buf, NUM_ACTIVE_SET_ENTRIES, now); + var valid_gossip_indexs = std.ArrayList(usize).init(self.allocator); + defer valid_gossip_indexs.deinit(); + + var pings_to_send_out = std.ArrayList(PingAndSocketAddr).init(self.allocator); + defer pings_to_send_out.deinit(); + // filter out peers who have responded to pings - const ping_cache_result = blk: { + { var ping_cache_lock = self.ping_cache_rw.write(); defer ping_cache_lock.unlock(); - var ping_cache: *PingCache = ping_cache_lock.mut(); - - const result = try ping_cache.filterValidPeers(self.allocator, self.my_keypair, gossip_peers); - break :blk result; - }; - var valid_gossip_indexs = ping_cache_result.valid_peers; - defer valid_gossip_indexs.deinit(); + const ping_cache: *PingCache = ping_cache_lock.mut(); + try ping_cache.filterValidPeers(self.my_keypair, gossip_peers, .{ + .valid_peers = &valid_gossip_indexs, + .pings = &pings_to_send_out, + }); + } var valid_gossip_peers: [NUM_ACTIVE_SET_ENTRIES]ContactInfo = undefined; for (0.., valid_gossip_indexs.items) |i, valid_gossip_index| { @@ -918,9 +924,7 @@ pub const GossipService = struct { } // send pings to peers - var pings_to_send_out = ping_cache_result.pings; - defer pings_to_send_out.deinit(); - try self.sendPings(pings_to_send_out); + try self.sendPings(pings_to_send_out.items); // reset push active set var active_set_lock = self.active_set_rw.write(); @@ -931,7 +935,8 @@ pub const GossipService = struct { /// logic for building new push messages which are sent to peers from the /// active set and serialized into packets. - fn buildPushMessages(self: *Self, push_cursor: *u64) !ArrayList(ArrayList(Packet)) { + /// NOTE: Caller must deinitialise the elements of the returned arraylist with `allocator`. + fn buildPushMessages(self: *Self, allocator: std.mem.Allocator, push_cursor: *u64) !ArrayList(std.ArrayListUnmanaged(Packet)) { // TODO: find a better static value? var buf: [512]gossip.GossipVersionedData = undefined; @@ -943,7 +948,7 @@ pub const GossipService = struct { break :blk gossip_table.getEntriesWithCursor(&buf, push_cursor); }; - var packet_batch = ArrayList(ArrayList(Packet)).init(self.allocator); + var packet_batch = ArrayList(std.ArrayListUnmanaged(Packet)).init(allocator); errdefer packet_batch.deinit(); if (gossip_entries.len == 0) { @@ -956,11 +961,10 @@ pub const GossipService = struct { // find new values in gossip table // TODO: benchmark different approach of HashMapping(origin, value) first // then deriving the active set per origin in a batch - var push_messages = std.AutoHashMap(EndPoint, ArrayList(SignedGossipData)).init(self.allocator); + var push_messages = std.AutoArrayHashMap(EndPoint, std.ArrayListUnmanaged(SignedGossipData)).init(self.allocator); defer { - var push_iter = push_messages.iterator(); - while (push_iter.next()) |push_entry| { - push_entry.value_ptr.deinit(); + for (push_messages.values()) |*push_value| { + push_value.deinit(self.allocator); } push_messages.deinit(); } @@ -971,7 +975,9 @@ pub const GossipService = struct { var active_set: *const ActiveSet = active_set_lock.get(); defer active_set_lock.unlock(); - if (active_set.len() == 0) return packet_batch; + if (active_set.len() == 0) { + return packet_batch; + } for (gossip_entries) |entry| { const value = entry.value; @@ -1003,13 +1009,13 @@ pub const GossipService = struct { defer active_set_peers.deinit(); for (active_set_peers.items) |peer| { - const maybe_peer_entry = push_messages.getEntry(peer); - if (maybe_peer_entry) |peer_entry| { - try peer_entry.value_ptr.append(value); + if (push_messages.getEntry(peer)) |peer_entry| { + try peer_entry.value_ptr.append(self.allocator, value); } else { - var peer_entry = try ArrayList(SignedGossipData).initCapacity(self.allocator, 1); + try push_messages.ensureUnusedCapacity(1); + var peer_entry = try std.ArrayListUnmanaged(SignedGossipData).initCapacity(self.allocator, 1); peer_entry.appendAssumeCapacity(value); - try push_messages.put(peer, peer_entry); + push_messages.putAssumeCapacity(peer, peer_entry); } } num_values_considered += 1; @@ -1021,21 +1027,18 @@ pub const GossipService = struct { const num_values_not_considered = gossip_entries.len - num_values_considered; push_cursor.* -= num_values_not_considered; - var push_iter = push_messages.iterator(); - while (push_iter.next()) |push_entry| { - const gossip_values: *const ArrayList(SignedGossipData) = push_entry.value_ptr; - const to_endpoint: *const EndPoint = push_entry.key_ptr; - + try packet_batch.ensureUnusedCapacity(push_messages.count()); + for (push_messages.keys(), push_messages.values()) |*to_endpoint, *gossip_values| { // send the values as a pull response - const packets = try gossipDataToPackets( - self.allocator, + var packets = try gossipDataToPackets( + allocator, &self.my_pubkey, gossip_values.items, to_endpoint, - ChunkType.PushMessage, + .PushMessage, ); if (packets.items.len > 0) { - try packet_batch.append(packets); + packet_batch.appendAssumeCapacity(packets.moveToUnmanaged()); } } return packet_batch; @@ -1070,25 +1073,29 @@ pub const GossipService = struct { entrypoint_index = @intCast(maybe_entrypoint_index); } + var valid_gossip_peer_indexes = std.ArrayList(usize).init(self.allocator); + defer valid_gossip_peer_indexes.deinit(); + // filter out peers who have responded to pings - const ping_cache_result = blk: { + { var ping_cache_lock = self.ping_cache_rw.write(); defer ping_cache_lock.unlock(); - var ping_cache: *PingCache = ping_cache_lock.mut(); + const ping_cache: *PingCache = ping_cache_lock.mut(); - const result = try ping_cache.filterValidPeers(self.allocator, self.my_keypair, peers); - break :blk result; - }; - var valid_gossip_peer_indexs = ping_cache_result.valid_peers; - defer valid_gossip_peer_indexs.deinit(); + // send pings to peers + var pings_to_send_out = std.ArrayList(PingAndSocketAddr).init(self.allocator); + defer pings_to_send_out.deinit(); - // send pings to peers - var pings_to_send_out = ping_cache_result.pings; - defer pings_to_send_out.deinit(); - try self.sendPings(pings_to_send_out); + try ping_cache.filterValidPeers(self.my_keypair, peers, .{ + .valid_peers = &valid_gossip_peer_indexes, + .pings = &pings_to_send_out, + }); + + try self.sendPings(pings_to_send_out.items); + } const should_send_to_entrypoint = entrypoint_index != -1; - const num_peers = valid_gossip_peer_indexs.items.len; + const num_peers = valid_gossip_peer_indexes.items.len; if (num_peers == 0 and !should_send_to_entrypoint) { return error.NoPeers; @@ -1133,7 +1140,7 @@ pub const GossipService = struct { for (filters.items) |filter_i| { // TODO: incorperate stake weight in random sampling const peer_index = rng.random().intRangeAtMost(usize, 0, num_peers - 1); - const peer_contact_info_index = valid_gossip_peer_indexs.items[peer_index]; + const peer_contact_info_index = valid_gossip_peer_indexes.items[peer_index]; const peer_contact_info = peers[peer_contact_info_index]; if (peer_contact_info.getSocket(socket_tag.GOSSIP)) |gossip_addr| { const message = GossipMessage{ .PullRequest = .{ filter_i, my_contact_info_value } }; @@ -1164,7 +1171,8 @@ pub const GossipService = struct { return packet_batch; } - const PullRequestTask = struct { + const PullRequestTask = ThreadPoolTask(PullRequestEntry); + const PullRequestEntry = struct { allocator: std.mem.Allocator, my_pubkey: *const Pubkey, from_endpoint: *const EndPoint, @@ -1175,19 +1183,13 @@ pub const GossipService = struct { output_limit: *std.atomic.Value(i64), output_consumed: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - task: Task, - done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - - pub fn deinit(this: *PullRequestTask) void { + pub fn deinit(this: *PullRequestEntry) void { if (!this.output_consumed.load(.acquire)) { this.output.deinit(); } } - pub fn callback(task: *Task) void { - var self: *@This() = @fieldParentPtr("task", task); - defer self.done.store(true, .release); - + pub fn callback(self: *PullRequestEntry) void { const output_limit = self.output_limit.load(.unordered); if (output_limit <= 0) { return; @@ -1248,7 +1250,7 @@ pub const GossipService = struct { } } - var valid_indexs = blk: { + const valid_peer_indexes = blk: { var ping_cache_lock = self.ping_cache_rw.write(); defer ping_cache_lock.unlock(); var ping_cache: *PingCache = ping_cache_lock.mut(); @@ -1272,42 +1274,47 @@ pub const GossipService = struct { } } - const result = try ping_cache.filterValidPeers(self.allocator, self.my_keypair, peers.items); - defer result.pings.deinit(); - try self.sendPings(result.pings); + var valid_peer_indexes = std.ArrayList(usize).init(self.allocator); + errdefer valid_peer_indexes.deinit(); + + var pings = std.ArrayList(PingAndSocketAddr).init(self.allocator); + defer pings.deinit(); - break :blk result.valid_peers; + try ping_cache.filterValidPeers(self.my_keypair, peers.items, .{ + .valid_peers = &valid_peer_indexes, + .pings = &pings, + }); + try self.sendPings(pings.items); + + break :blk valid_peer_indexes; }; - defer valid_indexs.deinit(); + defer valid_peer_indexes.deinit(); - if (valid_indexs.items.len == 0) { + if (valid_peer_indexes.items.len == 0) { return; } // create the pull requests - const n_valid_requests = valid_indexs.items.len; - - const tasks = try self.allocator.alloc(PullRequestTask, n_valid_requests); - defer { - for (tasks) |*task| task.deinit(); - self.allocator.free(tasks); - } + const n_valid_requests = valid_peer_indexes.items.len; + const tasks = try PullRequestTask.init(self.allocator, n_valid_requests); + errdefer self.allocator.free(tasks); { + errdefer @compileError("Must not fail before the defer to properly deinit the tasks"); + var gossip_table_lock = self.gossip_table_rw.read(); const gossip_table: *const GossipTable = gossip_table_lock.get(); defer gossip_table_lock.unlock(); var output_limit = std.atomic.Value(i64).init(MAX_NUM_VALUES_PULL_RESPONSE); - for (valid_indexs.items, 0..) |i, task_index| { + for (valid_peer_indexes.items, tasks) |peer_idx, *task| { // create the thread task - tasks[task_index] = PullRequestTask{ - .task = .{ .callback = PullRequestTask.callback }, + task.entry = .{ .my_pubkey = &self.my_pubkey, - .from_endpoint = &pull_requests.items[i].from_endpoint, - .filter = &pull_requests.items[i].filter, - .value = &pull_requests.items[i].value, + .from_endpoint = &pull_requests.items[peer_idx].from_endpoint, + .filter = &pull_requests.items[peer_idx].filter, + .value = &pull_requests.items[peer_idx].value, .gossip_table = gossip_table, .output = ArrayList(Packet).init(self.allocator), .allocator = self.allocator, @@ -1315,25 +1322,30 @@ pub const GossipService = struct { }; // run it - const batch = Batch.from(&tasks[task_index].task); + const batch = Batch.from(&task.task); self.thread_pool.schedule(batch); } // wait for them to be done to release the lock for (tasks) |*task| { - while (!task.done.load(.acquire)) { - // wait - } + task.blockUntilCompletion(); } } + defer for (tasks) |*task| { + task.entry.deinit(); + }; for (tasks) |*task| { - if (task.output.items.len > 0) { - self.stats.pull_responses_sent.add(1); - // TODO: should only need one mux lock in this loop - try self.packet_outgoing_channel.send(task.output); - task.output_consumed.store(true, .release); - } + if (task.entry.output.items.len == 0) continue; + + self.stats.pull_responses_sent.add(1); + // TODO: should only need one mux lock in this loop + + var packet_batch_unmanaged = task.entry.output.moveToUnmanaged(); + errdefer packet_batch_unmanaged.deinit(self.allocator); + + try self.packet_outgoing_channel.send(packet_batch_unmanaged); + task.entry.output_consumed.store(true, .release); } } @@ -1358,20 +1370,18 @@ pub const GossipService = struct { pub fn handleBatchPingMessages( self: *Self, - ping_messages: *const ArrayList(PingMessage), + ping_messages: []const PingMessage, ) !void { - const n_ping_messages = ping_messages.items.len; - // init a new batch of pong responses - var ping_packet_batch = try ArrayList(Packet).initCapacity(self.allocator, n_ping_messages); - ping_packet_batch.appendNTimesAssumeCapacity(Packet.default(), n_ping_messages); - errdefer ping_packet_batch.deinit(); + const ping_packet_batch = try self.allocator.alloc(Packet, ping_messages.len); + errdefer self.allocator.free(ping_packet_batch); + @memset(ping_packet_batch, Packet.default()); - for (ping_messages.items, 0..) |*ping_message, i| { - const pong = try Pong.init(ping_message.ping, &self.my_keypair); - const pong_message = GossipMessage{ .PongMessage = pong }; + for (ping_packet_batch, ping_messages) |*packet, *ping_message| { + const pong_message: GossipMessage = .{ + .PongMessage = try Pong.init(ping_message.ping, &self.my_keypair), + }; - var packet = &ping_packet_batch.items[i]; const bytes_written = try bincode.writeToSlice( &packet.data, pong_message, @@ -1388,8 +1398,8 @@ pub const GossipService = struct { .field("from_pubkey", &ping_message.ping.from.string()) .debug("gossip: recv ping"); } - self.stats.pong_messages_sent.add(n_ping_messages); - try self.packet_outgoing_channel.send(ping_packet_batch); + self.stats.pong_messages_sent.add(ping_messages.len); + try self.packet_outgoing_channel.send(std.ArrayListUnmanaged(Packet).fromOwnedSlice(ping_packet_batch)); } /// logic for handling a pull response message. @@ -1675,6 +1685,7 @@ pub const GossipService = struct { if (n_packets == 0) return; var prune_packet_batch = try ArrayList(Packet).initCapacity(self.allocator, n_packets); + errdefer prune_packet_batch.deinit(); prune_packet_batch.appendNTimesAssumeCapacity(Packet.default(), n_packets); var count: usize = 0; @@ -1701,7 +1712,7 @@ pub const GossipService = struct { .field("to_addr", &from_pubkey.string()) .debug("gossip: send prune_message"); - var packet = &prune_packet_batch.items[count]; + const packet = &prune_packet_batch.items[count]; const written_slice = bincode.writeToSlice(&packet.data, msg, bincode.Params{}) catch unreachable; packet.size = written_slice.len; packet.addr = from_endpoint; @@ -1709,7 +1720,10 @@ pub const GossipService = struct { } self.stats.prune_messages_sent.add(n_packets); - try self.packet_outgoing_channel.send(prune_packet_batch); + + var packet_batch_unmanaged = prune_packet_batch.moveToUnmanaged(); + errdefer packet_batch_unmanaged.deinit(self.allocator); + try self.packet_outgoing_channel.send(packet_batch_unmanaged); } /// removes old values from the gossip table and failed pull hashes struct @@ -1811,27 +1825,27 @@ pub const GossipService = struct { /// serializes a list of ping messages into Packets and sends them out pub fn sendPings( - self: *Self, - pings: ArrayList(PingAndSocketAddr), + self: *const Self, + pings: []const PingAndSocketAddr, ) error{ OutOfMemory, ChannelClosed, SerializationError }!void { - const n_pings = pings.items.len; - if (n_pings == 0) return; - - var packet_batch = try ArrayList(Packet).initCapacity(self.allocator, n_pings); - errdefer packet_batch.deinit(); - packet_batch.appendNTimesAssumeCapacity(Packet.default(), n_pings); + if (pings.len == 0) return; - for (pings.items, 0..) |ping_and_addr, i| { - const message = GossipMessage{ .PingMessage = ping_and_addr.ping }; + // var packet_batch = try ArrayList(Packet).initCapacity(self.allocator, n_pings); + // errdefer packet_batch.deinit(); + // packet_batch.appendNTimesAssumeCapacity(Packet.default(), n_pings); + const packet_batch = try self.allocator.alloc(Packet, pings.len); + errdefer self.allocator.free(packet_batch); + @memset(packet_batch, Packet.default()); - var packet = &packet_batch.items[i]; + for (packet_batch, pings) |*packet, ping_and_addr| { + const message: GossipMessage = .{ .PingMessage = ping_and_addr.ping }; const serialized_ping = bincode.writeToSlice(&packet.data, message, .{}) catch return error.SerializationError; packet.size = serialized_ping.len; packet.addr = ping_and_addr.socket.toEndpoint(); } - self.stats.ping_messages_sent.add(n_pings); - try self.packet_outgoing_channel.send(packet_batch); + self.stats.ping_messages_sent.add(pings.len); + try self.packet_outgoing_channel.send(std.ArrayListUnmanaged(Packet).fromOwnedSlice(packet_batch)); } /// returns a list of valid gossip nodes. this works by reading @@ -2095,9 +2109,10 @@ pub fn gossipDataToPackets( gossip_values: []SignedGossipData, to_endpoint: *const EndPoint, chunk_type: ChunkType, -) error{ OutOfMemory, SerializationError }!ArrayList(Packet) { - if (gossip_values.len == 0) +) error{ OutOfMemory, SerializationError }!std.ArrayList(Packet) { + if (gossip_values.len == 0) { return ArrayList(Packet).init(allocator); + } const indexs = try chunkValuesIntoPacketIndexes( allocator, @@ -2107,7 +2122,6 @@ pub fn gossipDataToPackets( defer indexs.deinit(); var chunk_iter = std.mem.window(usize, indexs.items, 2, 1); - var packet_buf: [PACKET_DATA_SIZE]u8 = undefined; var packets = try ArrayList(Packet).initCapacity(allocator, indexs.items.len -| 1); errdefer packets.deinit(); @@ -2116,10 +2130,12 @@ pub fn gossipDataToPackets( const end_index = window[1]; const values = gossip_values[start_index..end_index]; - const message = switch (chunk_type) { - .PushMessage => GossipMessage{ .PushMessage = .{ my_pubkey.*, values } }, - .PullResponse => GossipMessage{ .PullResponse = .{ my_pubkey.*, values } }, + const message: GossipMessage = switch (chunk_type) { + .PushMessage => .{ .PushMessage = .{ my_pubkey.*, values } }, + .PullResponse => .{ .PullResponse = .{ my_pubkey.*, values } }, }; + + var packet_buf: [PACKET_DATA_SIZE]u8 = undefined; const msg_slice = bincode.writeToSlice(&packet_buf, message, bincode.Params{}) catch { return error.SerializationError; }; @@ -2639,15 +2655,24 @@ test "gossip.service: test build push messages" { clg.unlock(); var cursor: u64 = 0; - var msgs = try gossip_service.buildPushMessages(&cursor); - try std.testing.expectEqual(cursor, 11); - try std.testing.expect(msgs.items.len > 0); - for (msgs.items) |*msg| msg.deinit(); - msgs.deinit(); - - const msgs2 = try gossip_service.buildPushMessages(&cursor); - try std.testing.expectEqual(cursor, 11); - try std.testing.expect(msgs2.items.len == 0); + + { + var msgs = try gossip_service.buildPushMessages(allocator, &cursor); + defer msgs.deinit(); + defer for (msgs.items) |*msg| msg.deinit(allocator); + + try std.testing.expectEqual(cursor, 11); + try std.testing.expect(msgs.items.len > 0); + } + + { + var msgs2 = try gossip_service.buildPushMessages(allocator, &cursor); + defer msgs2.deinit(); + defer for (msgs2.items) |*msg| msg.deinit(allocator); + + try std.testing.expectEqual(cursor, 11); + try std.testing.expect(msgs2.items.len == 0); + } } test "gossip.gossip_service: test packet verification" { @@ -2670,8 +2695,8 @@ test "gossip.gossip_service: test packet verification" { ); defer gossip_service.deinit(); - var packet_channel = gossip_service.packet_incoming_channel; - var verified_channel = gossip_service.verified_incoming_channel; + const packet_channel = gossip_service.packet_incoming_channel; + const verified_channel = gossip_service.verified_incoming_channel; const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{&gossip_service}); @@ -2691,68 +2716,77 @@ test "gossip.gossip_service: test packet verification" { var peer = SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 0); const from = peer.toEndpoint(); - var buf = [_]u8{0} ** PACKET_DATA_SIZE; - const out = try bincode.writeToSlice(buf[0..], message, bincode.Params{}); - const packet = Packet.init(from, buf, out.len); - var packet_batch = ArrayList(Packet).init(allocator); - for (0..3) |_| { - try packet_batch.append(packet); - } - try packet_channel.send(packet_batch); + { + var buf = [_]u8{0} ** PACKET_DATA_SIZE; + const out = try bincode.writeToSlice(buf[0..], message, bincode.Params{}); + const packet = Packet.init(from, buf, out.len); - var packet_batch_2 = ArrayList(Packet).init(allocator); + var packet_batch: std.ArrayListUnmanaged(Packet) = .{}; + errdefer packet_batch.deinit(allocator); - // send one which fails sanitization - var value_v2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 2), &keypair); - value_v2.data.EpochSlots[0] = gossip.MAX_EPOCH_SLOTS; - var values_v2 = [_]gossip.SignedGossipData{value_v2}; - const message_v2 = GossipMessage{ - .PushMessage = .{ id, &values_v2 }, - }; - var buf_v2 = [_]u8{0} ** PACKET_DATA_SIZE; - const out_v2 = try bincode.writeToSlice(buf_v2[0..], message_v2, bincode.Params{}); - const packet_v2 = Packet.init(from, buf_v2, out_v2.len); - try packet_batch_2.append(packet_v2); - - // send one with a incorrect signature - var rand_keypair = try KeyPair.create([_]u8{3} ** 32); - const value2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 0), &rand_keypair); - var values2 = [_]gossip.SignedGossipData{value2}; - const message2 = GossipMessage{ - .PushMessage = .{ id, &values2 }, - }; - var buf2 = [_]u8{0} ** PACKET_DATA_SIZE; - const out2 = try bincode.writeToSlice(buf2[0..], message2, bincode.Params{}); - const packet2 = Packet.init(from, buf2, out2.len); - try packet_batch_2.append(packet2); + try packet_batch.appendNTimes(allocator, packet, 3); + try packet_channel.send(packet_batch); + } - // send it with a SignedGossipData which hash a slice { - const rand_pubkey = Pubkey.fromPublicKey(&rand_keypair.public_key); - var dshred = gossip.DuplicateShred.random(rng.random()); - var chunk: [32]u8 = .{1} ** 32; - dshred.chunk = &chunk; - dshred.wallclock = 1714155765121; - dshred.slot = 16592333628234015598; - dshred.shred_index = 3853562894; - dshred.shred_type = gossip.ShredType.Data; - dshred.num_chunks = 99; - dshred.chunk_index = 69; - dshred.from = rand_pubkey; - const dshred_data = gossip.GossipData{ - .DuplicateShred = .{ 1, dshred }, + var packet_batch_2 = ArrayList(Packet).init(allocator); + errdefer packet_batch_2.deinit(); + + // send one which fails sanitization + var value_v2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 2), &keypair); + value_v2.data.EpochSlots[0] = gossip.MAX_EPOCH_SLOTS; + var values_v2 = [_]gossip.SignedGossipData{value_v2}; + const message_v2 = GossipMessage{ + .PushMessage = .{ id, &values_v2 }, }; - const dshred_value = try SignedGossipData.initSigned(dshred_data, &rand_keypair); - var values3 = [_]gossip.SignedGossipData{dshred_value}; - const message3 = GossipMessage{ - .PushMessage = .{ id, &values3 }, + var buf_v2 = [_]u8{0} ** PACKET_DATA_SIZE; + const out_v2 = try bincode.writeToSlice(buf_v2[0..], message_v2, bincode.Params{}); + const packet_v2 = Packet.init(from, buf_v2, out_v2.len); + try packet_batch_2.append(packet_v2); + + // send one with a incorrect signature + var rand_keypair = try KeyPair.create([_]u8{3} ** 32); + const value2 = try SignedGossipData.initSigned(gossip.GossipData.randomFromIndex(rng.random(), 0), &rand_keypair); + var values2 = [_]gossip.SignedGossipData{value2}; + const message2 = GossipMessage{ + .PushMessage = .{ id, &values2 }, }; - var buf3 = [_]u8{0} ** PACKET_DATA_SIZE; - const out3 = try bincode.writeToSlice(buf3[0..], message3, bincode.Params{}); - const packet3 = Packet.init(from, buf3, out3.len); - try packet_batch_2.append(packet3); + var buf2 = [_]u8{0} ** PACKET_DATA_SIZE; + const out2 = try bincode.writeToSlice(buf2[0..], message2, bincode.Params{}); + const packet2 = Packet.init(from, buf2, out2.len); + try packet_batch_2.append(packet2); + + // send it with a SignedGossipData which hash a slice + { + const rand_pubkey = Pubkey.fromPublicKey(&rand_keypair.public_key); + var dshred = gossip.DuplicateShred.random(rng.random()); + var chunk: [32]u8 = .{1} ** 32; + dshred.chunk = &chunk; + dshred.wallclock = 1714155765121; + dshred.slot = 16592333628234015598; + dshred.shred_index = 3853562894; + dshred.shred_type = gossip.ShredType.Data; + dshred.num_chunks = 99; + dshred.chunk_index = 69; + dshred.from = rand_pubkey; + const dshred_data = gossip.GossipData{ + .DuplicateShred = .{ 1, dshred }, + }; + const dshred_value = try SignedGossipData.initSigned(dshred_data, &rand_keypair); + var values3 = [_]gossip.SignedGossipData{dshred_value}; + const message3 = GossipMessage{ + .PushMessage = .{ id, &values3 }, + }; + var buf3 = [_]u8{0} ** PACKET_DATA_SIZE; + const out3 = try bincode.writeToSlice(buf3[0..], message3, bincode.Params{}); + const packet3 = Packet.init(from, buf3, out3.len); + try packet_batch_2.append(packet3); + } + + var packet_batch_2_unmanaged = packet_batch_2.moveToUnmanaged(); + errdefer packet_batch_2_unmanaged.deinit(allocator); + try packet_channel.send(packet_batch_2_unmanaged); } - try packet_channel.send(packet_batch_2); var msg_count: usize = 0; var attempt_count: usize = 0; @@ -2868,7 +2902,7 @@ test "gossip.gossip_service: process contact info push packet" { const resp = (try responder_channel.try_drain()).?; defer { for (resp) |*packet_batch| { - packet_batch.deinit(); + packet_batch.deinit(allocator); } responder_channel.allocator.free(resp); } @@ -3013,6 +3047,7 @@ pub const BenchmarkGossipServiceGeneral = struct { bench_args.message_counts.n_push_message + bench_args.message_counts.n_pull_response, ); + errdefer packet_batch.deinit(); for (0..bench_args.message_counts.n_ping) |_| { // send a ping message @@ -3044,8 +3079,12 @@ pub const BenchmarkGossipServiceGeneral = struct { try packet_batch.appendSlice(packets.items); } - // send all messages in one go - try outgoing_channel.send(packet_batch); + { + var packet_batch_unmanaged = packet_batch.moveToUnmanaged(); + errdefer packet_batch_unmanaged.deinit(allocator); + // send all messages in one go + try outgoing_channel.send(packet_batch_unmanaged); + } // wait for all messages to be processed var timer = try std.time.Timer.start(); @@ -3168,7 +3207,7 @@ pub const BenchmarkGossipServicePullRequests = struct { const outgoing_channel = gossip_service.packet_incoming_channel; // generate messages - var packet_batch = try ArrayList(Packet).initCapacity( + var packet_batch = try std.ArrayListUnmanaged(Packet).initCapacity( allocator, bench_args.n_pull_requests, ); diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 8807d23b9..bd129be56 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -11,20 +11,22 @@ pub const SOCKET_TIMEOUT_US: usize = 1 * std.time.us_per_s; pub const PACKETS_PER_BATCH: usize = 64; pub fn readSocket( - allocator: std.mem.Allocator, socket_: UdpSocket, - incoming_channel: *Channel(std.ArrayList(Packet)), + /// Allocator in charge of allocating & deallocating each packet batch in the `incoming_channel`. + packet_batches_allocator: std.mem.Allocator, + incoming_channel: *Channel(std.ArrayListUnmanaged(Packet)), exit: *const std.atomic.Value(bool), logger: Logger, ) !void { - // NOTE: we set to non-blocking to periodically check if we should exit var socket = socket_; + + // NOTE: we set to non-blocking to periodically check if we should exit try socket.setReadTimeout(SOCKET_TIMEOUT_US); inf_loop: while (!exit.load(.unordered)) { // init a new batch var packet_batch = try std.ArrayList(Packet).initCapacity( - allocator, + packet_batches_allocator, PACKETS_PER_BATCH, ); errdefer packet_batch.deinit(); @@ -51,7 +53,10 @@ pub fn readSocket( } packet_batch.shrinkAndFree(packet_batch.items.len); - try incoming_channel.send(packet_batch); + + var packet_batch_unmanaged = packet_batch.moveToUnmanaged(); + errdefer packet_batch_unmanaged.deinit(packet_batches_allocator); + try incoming_channel.send(packet_batch_unmanaged); } logger.debugf("readSocket loop closed", .{}); @@ -59,7 +64,9 @@ pub fn readSocket( pub fn sendSocket( socket: UdpSocket, - outgoing_channel: *Channel(std.ArrayList(Packet)), + /// Allocator in charge of allocating & deallocating each packet batch in the `incoming_channel`. + packet_batches_allocator: std.mem.Allocator, + outgoing_channel: *Channel(std.ArrayListUnmanaged(Packet)), exit: *const std.atomic.Value(bool), logger: Logger, ) error{ SocketSendError, OutOfMemory, ChannelClosed }!void { @@ -74,7 +81,7 @@ pub fn sendSocket( }; defer { for (packet_batches) |*packet_batch| { - packet_batch.deinit(); + packet_batch.deinit(packet_batches_allocator); } outgoing_channel.allocator.free(packet_batches); } @@ -100,27 +107,37 @@ pub fn sendSocket( /// was initialized. While you *could* send data to the channel for a "receiver" /// socket, the underlying thread won't actually read the data from the channel. pub const SocketThread = struct { - channel: *Channel(std.ArrayList(Packet)), + channel: *Channel(std.ArrayListUnmanaged(Packet)), exit: *std.atomic.Value(bool), handle: std.Thread, const Self = @This(); - pub fn initSender(allocator: Allocator, logger: Logger, socket: UdpSocket, exit: *Atomic(bool)) !Self { - const channel = Channel(std.ArrayList(Packet)).init(allocator, 0); + pub fn initSender( + allocator: Allocator, + logger: Logger, + socket: UdpSocket, + exit: *Atomic(bool), + ) !Self { + const channel = Channel(std.ArrayListUnmanaged(Packet)).init(allocator, 0); return .{ .channel = channel, .exit = exit, - .handle = try std.Thread.spawn(.{}, sendSocket, .{ socket, channel, exit, logger }), + .handle = try std.Thread.spawn(.{}, sendSocket, .{ socket, allocator, channel, exit, logger }), }; } - pub fn initReceiver(allocator: Allocator, logger: Logger, socket: UdpSocket, exit: *Atomic(bool)) !Self { - const channel = Channel(std.ArrayList(Packet)).init(allocator, 0); + pub fn initReceiver( + allocator: Allocator, + logger: Logger, + socket: UdpSocket, + exit: *Atomic(bool), + ) !Self { + const channel = Channel(std.ArrayListUnmanaged(Packet)).init(allocator, 0); return .{ .channel = channel, .exit = exit, - .handle = try std.Thread.spawn(.{}, readSocket, .{ allocator, socket, channel, exit, logger }), + .handle = try std.Thread.spawn(.{}, readSocket, .{ socket, allocator, channel, exit, logger }), }; } @@ -151,7 +168,7 @@ pub const BenchmarkPacketProcessing = struct { const n_packets = bench_args.n_packets; const allocator = std.heap.page_allocator; - var channel = Channel(std.ArrayList(Packet)).init(allocator, n_packets); + const channel = Channel(std.ArrayListUnmanaged(Packet)).init(allocator, n_packets); defer channel.deinit(); var socket = try UdpSocket.create(.ipv4, .udp); @@ -162,8 +179,8 @@ pub const BenchmarkPacketProcessing = struct { var exit = std.atomic.Value(bool).init(false); - var handle = try std.Thread.spawn(.{}, readSocket, .{ allocator, socket, channel, &exit, .noop }); - var recv_handle = try std.Thread.spawn(.{}, benchmarkChannelRecv, .{ channel, n_packets }); + const handle = try std.Thread.spawn(.{}, readSocket, .{ socket, allocator, channel, &exit, .noop }); + const recv_handle = try std.Thread.spawn(.{}, benchmarkChannelRecv, .{ channel, n_packets }); var rand = std.rand.DefaultPrng.init(0); var packet_buf: [PACKET_DATA_SIZE]u8 = undefined; @@ -197,7 +214,7 @@ pub const BenchmarkPacketProcessing = struct { }; pub fn benchmarkChannelRecv( - channel: *Channel(std.ArrayList(Packet)), + channel: *Channel(std.ArrayListUnmanaged(Packet)), n_values_to_receive: usize, ) !void { var count: usize = 0; diff --git a/src/sync/channel.zig b/src/sync/channel.zig index e7c92d0ed..337c2e029 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -46,7 +46,7 @@ pub fn Channel(comptime T: type) type { self.has_value.signal(); } - pub fn sendBatch(self: *Self, value: std.ArrayList(T)) error{ OutOfMemory, ChannelClosed }!void { + pub fn sendBatch(self: *Self, value: []const T) error{ OutOfMemory, ChannelClosed }!void { if (self.closed.load(.monotonic)) { return error.ChannelClosed; } @@ -54,7 +54,7 @@ pub fn Channel(comptime T: type) type { defer buffer_lock.unlock(); var buffer: *std.ArrayList(T) = buffer_lock.mut(); - try buffer.appendSlice(value.items); + try buffer.appendSlice(value); self.has_value.signal(); } diff --git a/src/tvu/shred_receiver.zig b/src/tvu/shred_receiver.zig new file mode 100644 index 000000000..1f73ebd96 --- /dev/null +++ b/src/tvu/shred_receiver.zig @@ -0,0 +1,112 @@ +const std = @import("std"); +const network = @import("zig-network"); + +const bincode = @import("../bincode/bincode.zig"); + +const Allocator = std.mem.Allocator; +const ArrayList = std.ArrayList; +const Atomic = std.atomic.Value; +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const Socket = network.Socket; + +const Channel = @import("../sync/channel.zig").Channel; +const Logger = @import("../trace/log.zig").Logger; +const Packet = @import("../net/packet.zig").Packet; +const Ping = @import("../gossip/ping_pong.zig").Ping; +const Pong = @import("../gossip/ping_pong.zig").Pong; +const RepairMessage = @import("repair_message.zig").RepairMessage; +const SocketThread = @import("../net/socket_utils.zig").SocketThread; +const endpointToString = @import("../net/net.zig").endpointToString; + +/// Analogous to `ShredFetchStage` +pub const ShredReceiver = struct { + allocator: Allocator, + keypair: *const KeyPair, + exit: *Atomic(bool), + logger: Logger, + socket: *Socket, + + const Self = @This(); + + /// Run threads to listen/send over socket and handle all incoming packets. + /// Returns when exit is set to true. + pub fn run(self: *Self) !void { + defer self.logger.err("exiting shred receiver"); + errdefer self.logger.err("error in shred receiver"); + + var sender = try SocketThread.initSender(self.allocator, self.logger, self.socket, self.exit); + defer sender.deinit(); + + var receiver = try SocketThread.initReceiver(self.allocator, self.logger, self.socket, self.exit); + defer receiver.deinit(); + + try self.runPacketHandler(receiver.channel, sender.channel); + } + + /// Keep looping over packet channel and process the incoming packets. + /// Returns when exit is set to true. + fn runPacketHandler( + self: *Self, + receiver: *Channel(std.ArrayListUnmanaged(Packet)), + sender: *Channel(std.ArrayListUnmanaged(Packet)), + ) !void { + while (!self.exit.load(.unordered)) { + var responses = std.ArrayList(Packet).init(self.allocator); + errdefer responses.deinit(); + + if (try receiver.try_drain()) |batches| { + for (batches) |batch| for (batch.items) |*packet| { + try self.handlePacket(packet, &responses); + }; + if (responses.items.len > 0) { + var unmanaged = responses.moveToUnmanaged(); + errdefer unmanaged.deinit(self.allocator); + try sender.send(unmanaged); + } + } else { + std.time.sleep(10_000_000); + } + } + } + + /// Handle a single packet and return + fn handlePacket(self: *Self, packet: *const Packet, responses: *ArrayList(Packet)) !void { + if (packet.size == REPAIR_RESPONSE_SERIALIZED_PING_BYTES) { + try self.handlePing(packet, responses); + } else { + const endpoint_str = try endpointToString(self.allocator, &packet.addr); + defer endpoint_str.deinit(); + self.logger.field("from_endpoint", endpoint_str.items) + .infof("tvu: recv unknown shred message: {} bytes", .{packet.size}); + } + } + + /// Handle a ping message and return + fn handlePing(self: *Self, packet: *const Packet, responses: *ArrayList(Packet)) !void { + const repair_ping = bincode.readFromSlice(self.allocator, RepairPing, &packet.data, .{}) catch |e| { + self.logger.errf("could not deserialize ping: {} - {any}", .{ e, packet.data[0..packet.size] }); + return; + }; + const ping = repair_ping.Ping; + ping.verify() catch |e| { + self.logger.errf("ping failed verification: {} - {any}", .{ e, packet.data[0..packet.size] }); + return; + }; + + const reply = RepairMessage{ .Pong = try Pong.init(&ping, self.keypair) }; + const reply_packet = try responses.addOne(); + reply_packet.addr = packet.addr; + const reply_bytes = try bincode.writeToSlice(&reply_packet.data, reply, .{}); + reply_packet.size = reply_bytes.len; + + const endpoint_str = try endpointToString(self.allocator, &packet.addr); + defer endpoint_str.deinit(); + self.logger.field("from_endpoint", endpoint_str.items) + .field("from_pubkey", &ping.from.string()) + .info("tvu: recv repair ping"); + } +}; + +const REPAIR_RESPONSE_SERIALIZED_PING_BYTES = 132; + +const RepairPing = union(enum) { Ping: Ping };