Skip to content

Commit

Permalink
fix SendSignal lifetimes/initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
kprotty committed Jan 13, 2025
1 parent 5ebff6f commit 1bb1158
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ pub const GossipService = struct {
outgoing_pipe: ?*SocketPipe = null,

/// communication between threads
packet_incoming_signal: Channel(Packet).SendSignal = .{},
packet_incoming_signal: *Channel(Packet).SendSignal,
packet_incoming_channel: *Channel(Packet),
packet_outgoing_channel: *Channel(Packet),
verified_incoming_signal: Channel(GossipMessageWithEndpoint).SendSignal = .{},
verified_incoming_signal: *Channel(GossipMessageWithEndpoint).SendSignal,
verified_incoming_channel: *Channel(GossipMessageWithEndpoint),

/// table to store gossip values
Expand Down Expand Up @@ -232,6 +232,15 @@ pub const GossipService = struct {
var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator);
errdefer verified_incoming_channel.destroy();

// setup signals
const packet_incoming_signal = try Channel(Packet).SendSignal.create(allocator);
packet_incoming_channel.send_hook = &packet_incoming_signal.hook;
errdefer packet_incoming_signal.destroy(allocator);

const verified_incoming_signal = try Channel(GossipMessageWithEndpoint).SendSignal.create(allocator);
verified_incoming_channel.send_hook = &verified_incoming_signal.hook;
errdefer verified_incoming_signal.destroy(allocator);

// setup the socket (bind with read-timeout)
const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified;
var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed;
Expand Down Expand Up @@ -298,8 +307,10 @@ pub const GossipService = struct {
.my_pubkey = my_pubkey,
.my_shred_version = Atomic(u16).init(my_shred_version),
.gossip_socket = gossip_socket,
.packet_incoming_signal = packet_incoming_signal,
.packet_incoming_channel = packet_incoming_channel,
.packet_outgoing_channel = packet_outgoing_channel,
.verified_incoming_signal = verified_incoming_signal,
.verified_incoming_channel = verified_incoming_channel,
.gossip_table_rw = RwMux(GossipTable).init(gossip_table),
.push_msg_queue_mux = Mux(ArrayList(GossipData)).init(ArrayList(GossipData).init(allocator)),
Expand Down Expand Up @@ -342,12 +353,14 @@ pub const GossipService = struct {
// everything should be cleaned up when the thread-pool joins.
std.debug.assert(self.packet_incoming_channel.isEmpty());
self.packet_incoming_channel.destroy();
self.packet_incoming_signal.destroy(self.allocator);

std.debug.assert(self.packet_outgoing_channel.isEmpty());
self.packet_outgoing_channel.destroy();

std.debug.assert(self.verified_incoming_channel.isEmpty());
self.verified_incoming_channel.destroy();
self.verified_incoming_signal.destroy(self.allocator);

self.gossip_socket.close();

Expand Down Expand Up @@ -404,7 +417,6 @@ pub const GossipService = struct {
},
};

self.packet_incoming_channel.send_hook = &self.packet_incoming_signal.hook;
self.incoming_pipe = try SocketPipe.init(
self.allocator,
.receiver,
Expand All @@ -421,7 +433,6 @@ pub const GossipService = struct {
});
exit_condition.ordered.exit_index += 1;

self.verified_incoming_channel.send_hook = &self.verified_incoming_signal.hook;
try self.service_manager.spawn("[gossip] processMessages", processMessages, .{
self,
GOSSIP_PRNG_SEED,
Expand Down

0 comments on commit 1bb1158

Please sign in to comment.