diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 10792d1f3..5603d8784 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -153,10 +153,10 @@ pub const GossipService = struct { outgoing_pipe: ?*SocketPipe = null, /// communication between threads - packet_incoming_signal: Channel(Packet).SendSignal = .{}, + packet_incoming_signal: *Channel(Packet).SendSignal, packet_incoming_channel: *Channel(Packet), packet_outgoing_channel: *Channel(Packet), - verified_incoming_signal: Channel(GossipMessageWithEndpoint).SendSignal = .{}, + verified_incoming_signal: *Channel(GossipMessageWithEndpoint).SendSignal, verified_incoming_channel: *Channel(GossipMessageWithEndpoint), /// table to store gossip values @@ -232,6 +232,15 @@ pub const GossipService = struct { var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator); errdefer verified_incoming_channel.destroy(); + // setup signals + const packet_incoming_signal = try Channel(Packet).SendSignal.create(allocator); + packet_incoming_channel.send_hook = &packet_incoming_signal.hook; + errdefer packet_incoming_signal.destroy(allocator); + + const verified_incoming_signal = try Channel(GossipMessageWithEndpoint).SendSignal.create(allocator); + verified_incoming_channel.send_hook = &verified_incoming_signal.hook; + errdefer verified_incoming_signal.destroy(allocator); + // setup the socket (bind with read-timeout) const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified; var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed; @@ -298,8 +307,10 @@ pub const GossipService = struct { .my_pubkey = my_pubkey, .my_shred_version = Atomic(u16).init(my_shred_version), .gossip_socket = gossip_socket, + .packet_incoming_signal = packet_incoming_signal, .packet_incoming_channel = packet_incoming_channel, .packet_outgoing_channel = packet_outgoing_channel, + .verified_incoming_signal = verified_incoming_signal, .verified_incoming_channel = verified_incoming_channel, .gossip_table_rw = RwMux(GossipTable).init(gossip_table), .push_msg_queue_mux = Mux(ArrayList(GossipData)).init(ArrayList(GossipData).init(allocator)), @@ -342,12 +353,14 @@ pub const GossipService = struct { // everything should be cleaned up when the thread-pool joins. std.debug.assert(self.packet_incoming_channel.isEmpty()); self.packet_incoming_channel.destroy(); + self.packet_incoming_signal.destroy(self.allocator); std.debug.assert(self.packet_outgoing_channel.isEmpty()); self.packet_outgoing_channel.destroy(); std.debug.assert(self.verified_incoming_channel.isEmpty()); self.verified_incoming_channel.destroy(); + self.verified_incoming_signal.destroy(self.allocator); self.gossip_socket.close(); @@ -404,7 +417,6 @@ pub const GossipService = struct { }, }; - self.packet_incoming_channel.send_hook = &self.packet_incoming_signal.hook; self.incoming_pipe = try SocketPipe.init( self.allocator, .receiver, @@ -421,7 +433,6 @@ pub const GossipService = struct { }); exit_condition.ordered.exit_index += 1; - self.verified_incoming_channel.send_hook = &self.verified_incoming_signal.hook; try self.service_manager.spawn("[gossip] processMessages", processMessages, .{ self, GOSSIP_PRNG_SEED,