diff --git a/src/gossip/fuzz.zig b/src/gossip/fuzz.zig index 9b134a1a3..d44021891 100644 --- a/src/gossip/fuzz.zig +++ b/src/gossip/fuzz.zig @@ -8,7 +8,7 @@ const socket_utils = @import("./socket_utils.zig"); const _gossip_service = @import("./gossip_service.zig"); const GossipService = _gossip_service.GossipService; const ChunkType = _gossip_service.ChunkType; -const crds_values_to_packets = _gossip_service.crdsValuesToPackets; +const crdsValuesToPackets = _gossip_service.crdsValuesToPackets; const MAX_PUSH_MESSAGE_PAYLOAD_SIZE = _gossip_service.MAX_PUSH_MESSAGE_PAYLOAD_SIZE; const Logger = @import("../trace/log.zig").Logger; @@ -119,14 +119,14 @@ pub fn randomPushMessage(rng: std.rand.Random, keypair: *const KeyPair, to_addr: } const allocator = std.heap.page_allocator; - const packets = try crds_values_to_packets( + const packets = try crdsValuesToPackets( allocator, &Pubkey.fromPublicKey(&keypair.public_key, false), &crds_values, &to_addr, ChunkType.PushMessage, ); - return packets.?; + return packets; } pub fn randomPullResponse(rng: std.rand.Random, keypair: *const KeyPair, to_addr: EndPoint) !std.ArrayList(Packet) { @@ -139,14 +139,14 @@ pub fn randomPullResponse(rng: std.rand.Random, keypair: *const KeyPair, to_addr } const allocator = std.heap.page_allocator; - const packets = try crds_values_to_packets( + const packets = try crdsValuesToPackets( allocator, &Pubkey.fromPublicKey(&keypair.public_key, false), &crds_values, &to_addr, ChunkType.PullResponse, ); - return packets.?; + return packets; } pub fn randomPullRequest(allocator: std.mem.Allocator, rng: std.rand.Random, keypair: *const KeyPair, to_addr: EndPoint) !Packet { diff --git a/src/gossip/gossip_service.zig b/src/gossip/gossip_service.zig index d2b8a9722..a34efbc04 100644 --- a/src/gossip/gossip_service.zig +++ b/src/gossip/gossip_service.zig @@ -148,7 +148,7 @@ pub const GossipService = struct { } var thread_pool = try allocator.create(ThreadPool); - var n_threads = @min(@as(u32, @truncate(std.Thread.getCpuCount() catch 0)), 8); + var n_threads = @min(@as(u32, @truncate(std.Thread.getCpuCount() catch 1)), 8); thread_pool.* = ThreadPool.init(.{ .max_threads = n_threads, .stack_size = 2 * 1024 * 1024, @@ -736,8 +736,8 @@ pub const GossipService = struct { defer valid_gossip_indexs.deinit(); var valid_gossip_peers: [NUM_ACTIVE_SET_ENTRIES]LegacyContactInfo = undefined; - for (valid_gossip_indexs.items) |i| { - valid_gossip_peers[i] = gossip_peers[i]; + for (0.., valid_gossip_indexs.items) |i, valid_gossip_index| { + valid_gossip_peers[i] = gossip_peers[valid_gossip_index]; } // send pings to peers @@ -754,7 +754,7 @@ pub const GossipService = struct { /// logic for building new push messages which are sent to peers from the /// active set and serialized into packets. - fn buildPushMessages(self: *Self, push_cursor: *u64) !?ArrayList(ArrayList(Packet)) { + fn buildPushMessages(self: *Self, push_cursor: *u64) !ArrayList(ArrayList(Packet)) { // TODO: find a better static value? var buf: [512]crds.CrdsVersionedValue = undefined; @@ -766,8 +766,11 @@ pub const GossipService = struct { break :blk crds_table.getEntriesWithCursor(&buf, push_cursor); }; + var packet_batch = ArrayList(ArrayList(Packet)).init(self.allocator); + errdefer packet_batch.deinit(); + if (crds_entries.len == 0) { - return null; + return packet_batch; } const now = getWallclockMs(); @@ -791,7 +794,7 @@ pub const GossipService = struct { var active_set: *const ActiveSet = active_set_lock.get(); defer active_set_lock.unlock(); - if (active_set.len() == 0) return null; + if (active_set.len() == 0) return packet_batch; for (crds_entries) |entry| { const value = entry.value; @@ -841,24 +844,21 @@ pub const GossipService = struct { const num_values_not_considered = crds_entries.len - num_values_considered; push_cursor.* -= num_values_not_considered; - var packet_batch = ArrayList(ArrayList(Packet)).init(self.allocator); - errdefer packet_batch.deinit(); - var push_iter = push_messages.iterator(); while (push_iter.next()) |push_entry| { const crds_values: *const ArrayList(CrdsValue) = push_entry.value_ptr; const to_endpoint: *const EndPoint = push_entry.key_ptr; // send the values as a pull response - var maybe_endpoint_packets = try crdsValuesToPackets( + var packets = try crdsValuesToPackets( self.allocator, &self.my_pubkey, crds_values.items, to_endpoint, ChunkType.PushMessage, ); - if (maybe_endpoint_packets) |endpoint_packets| { - try packet_batch.append(endpoint_packets); + if (packets.items.len > 0) { + try packet_batch.append(packets); } } return packet_batch; @@ -1041,19 +1041,19 @@ pub const GossipService = struct { std.atomic.Ordering.Release, ); - const maybe_packets = crdsValuesToPackets( + const packets = crdsValuesToPackets( self.allocator, self.my_pubkey, response_crds_values.items, self.from_endpoint, ChunkType.PullResponse, - ) catch { - return; - }; + ) catch return; - if (maybe_packets) |*packets| { + if (packets.items.len > 0) { defer packets.deinit(); - self.output.appendSlice(packets.items) catch unreachable; + self.output.appendSlice(packets.items) catch { + std.debug.panic("thread task: failed to append packets", .{}); + }; } } }; @@ -1686,8 +1686,9 @@ pub fn crdsValuesToPackets( crds_values: []CrdsValue, to_endpoint: *const EndPoint, chunk_type: ChunkType, -) error{ OutOfMemory, SerializationError }!?ArrayList(Packet) { - if (crds_values.len == 0) return null; +) error{ OutOfMemory, SerializationError }!ArrayList(Packet) { + if (crds_values.len == 0) + return ArrayList(Packet).init(allocator); const indexs = try chunkValuesIntoPacketIndexs( allocator, @@ -2230,7 +2231,7 @@ test "gossip.gossip_service: test build_push_messages" { clg.unlock(); var cursor: u64 = 0; - var msgs = (try gossip_service.buildPushMessages(&cursor)).?; + var msgs = try gossip_service.buildPushMessages(&cursor); try std.testing.expectEqual(cursor, 11); try std.testing.expect(msgs.items.len > 0); for (msgs.items) |*msg| msg.deinit(); @@ -2238,7 +2239,7 @@ test "gossip.gossip_service: test build_push_messages" { var msgs2 = try gossip_service.buildPushMessages(&cursor); try std.testing.expectEqual(cursor, 11); - try std.testing.expect(msgs2 == null); + try std.testing.expect(msgs2.items.len == 0); } test "gossip.gossip_service: test packet verification" {