Skip to content

Commit

Permalink
embed ResetEvent in Channel
Browse files Browse the repository at this point in the history
replaces SendSignal
  • Loading branch information
kprotty committed Jan 14, 2025
1 parent 1bb1158 commit 19c2493
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 110 deletions.
19 changes: 2 additions & 17 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,8 @@ pub const GossipService = struct {
outgoing_pipe: ?*SocketPipe = null,

/// communication between threads
packet_incoming_signal: *Channel(Packet).SendSignal,
packet_incoming_channel: *Channel(Packet),
packet_outgoing_channel: *Channel(Packet),
verified_incoming_signal: *Channel(GossipMessageWithEndpoint).SendSignal,
verified_incoming_channel: *Channel(GossipMessageWithEndpoint),

/// table to store gossip values
Expand Down Expand Up @@ -232,15 +230,6 @@ pub const GossipService = struct {
var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator);
errdefer verified_incoming_channel.destroy();

// setup signals
const packet_incoming_signal = try Channel(Packet).SendSignal.create(allocator);
packet_incoming_channel.send_hook = &packet_incoming_signal.hook;
errdefer packet_incoming_signal.destroy(allocator);

const verified_incoming_signal = try Channel(GossipMessageWithEndpoint).SendSignal.create(allocator);
verified_incoming_channel.send_hook = &verified_incoming_signal.hook;
errdefer verified_incoming_signal.destroy(allocator);

// setup the socket (bind with read-timeout)
const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified;
var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed;
Expand Down Expand Up @@ -307,10 +296,8 @@ pub const GossipService = struct {
.my_pubkey = my_pubkey,
.my_shred_version = Atomic(u16).init(my_shred_version),
.gossip_socket = gossip_socket,
.packet_incoming_signal = packet_incoming_signal,
.packet_incoming_channel = packet_incoming_channel,
.packet_outgoing_channel = packet_outgoing_channel,
.verified_incoming_signal = verified_incoming_signal,
.verified_incoming_channel = verified_incoming_channel,
.gossip_table_rw = RwMux(GossipTable).init(gossip_table),
.push_msg_queue_mux = Mux(ArrayList(GossipData)).init(ArrayList(GossipData).init(allocator)),
Expand Down Expand Up @@ -353,14 +340,12 @@ pub const GossipService = struct {
// everything should be cleaned up when the thread-pool joins.
std.debug.assert(self.packet_incoming_channel.isEmpty());
self.packet_incoming_channel.destroy();
self.packet_incoming_signal.destroy(self.allocator);

std.debug.assert(self.packet_outgoing_channel.isEmpty());
self.packet_outgoing_channel.destroy();

std.debug.assert(self.verified_incoming_channel.isEmpty());
self.verified_incoming_channel.destroy();
self.verified_incoming_signal.destroy(self.allocator);

self.gossip_socket.close();

Expand Down Expand Up @@ -540,7 +525,7 @@ pub const GossipService = struct {

// loop until the previous service closes and triggers us to close
while (true) {
self.packet_incoming_signal.wait(exit_condition) catch break;
self.packet_incoming_channel.wait(exit_condition) catch break;

// verify in parallel using the threadpool
// PERF: investigate CPU pinning
Expand Down Expand Up @@ -637,7 +622,7 @@ pub const GossipService = struct {
// - `exit` isn't set,
// - there isn't any data to process in the input channel, in order to block the join until we've finished
while (true) {
self.verified_incoming_signal.wait(exit_condition) catch break;
self.verified_incoming_channel.wait(exit_condition) catch break;

var msg_count: usize = 0;
while (self.verified_incoming_channel.tryReceive()) |message| {
Expand Down
35 changes: 9 additions & 26 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const LOG_SCOPE: []const u8 = "socket_utils";

pub const SocketPipe = struct {
handle: std.Thread,
outgoing_signal: Channel(Packet).SendSignal,

const Self = @This();

Expand All @@ -41,29 +40,17 @@ pub const SocketPipe = struct {
const self = try allocator.create(Self);
errdefer allocator.destroy(self);

switch (direction) {
.sender => {
self.outgoing_signal = .{};
channel.send_hook = &self.outgoing_signal.hook;
self.handle = try std.Thread.spawn(
.{},
runSender,
.{ self, logger, socket, channel, exit },
);
self.* = .{
.handle = switch (direction) {
.sender => try std.Thread.spawn(.{}, runSend, .{ logger, socket, channel, exit }),
.receiver => try std.Thread.spawn(.{}, runRecv, .{ logger, socket, channel, exit }),
},
.receiver => {
self.handle = try std.Thread.spawn(
.{},
runReceiver,
.{ logger, socket, channel, exit },
);
},
}
};

return self;
}

fn runReceiver(
fn runRecv(
logger_: Logger,
socket_: UdpSocket,
incoming_channel: *Channel(Packet),
Expand Down Expand Up @@ -96,8 +83,7 @@ pub const SocketPipe = struct {
}
}

fn runSender(
self: *Self,
fn runSend(
logger_: Logger,
socket: UdpSocket,
outgoing_channel: *Channel(Packet),
Expand All @@ -112,7 +98,7 @@ pub const SocketPipe = struct {
}

while (true) {
self.outgoing_signal.wait(exit) catch break;
outgoing_channel.wait(exit) catch break;
while (outgoing_channel.tryReceive()) |p| {
if (exit.shouldExit()) break; // drop the rest (like above) if exit prematurely.
const bytes_sent = socket.sendTo(p.addr, p.data[0..p.size]) catch |e| {
Expand Down Expand Up @@ -164,9 +150,6 @@ pub const BenchmarkPacketProcessing = struct {
var incoming_channel = try Channel(Packet).init(allocator);
defer incoming_channel.deinit();

var incoming_signal: Channel(Packet).SendSignal = .{};
incoming_channel.send_hook = &incoming_signal.hook;

const incoming_pipe = try SocketPipe
.init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition);
defer incoming_pipe.deinit(allocator);
Expand Down Expand Up @@ -219,7 +202,7 @@ pub const BenchmarkPacketProcessing = struct {
var packets_to_recv = n_packets;
var timer = try sig.time.Timer.start();
while (packets_to_recv > 0) {
incoming_signal.wait(exit_condition) catch break;
incoming_channel.wait(exit_condition) catch break;
while (incoming_channel.tryReceive()) |_| {
packets_to_recv -|= 1;
}
Expand Down
13 changes: 0 additions & 13 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ pub fn start(
const retransmit_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel});

// signals (used to wait for channel senders)
const unverified_shred_signal = try Channel(Packet).SendSignal.create(arena);
unverified_shred_channel.send_hook = &unverified_shred_signal.hook;

const shreds_to_insert_signal = try Channel(Packet).SendSignal.create(arena);
shreds_to_insert_channel.send_hook = &shreds_to_insert_signal.hook;

const retransmit_signal = try Channel(Packet).SendSignal.create(arena);
retransmit_channel.send_hook = &retransmit_signal.hook;

// receiver (threads)
const shred_receiver = try arena.create(ShredReceiver);
shred_receiver.* = .{
Expand Down Expand Up @@ -130,7 +120,6 @@ pub fn start(
deps.exit,
deps.registry,
unverified_shred_channel,
unverified_shred_signal,
shreds_to_insert_channel,
retransmit_channel,
deps.epoch_context_mgr.slotLeaders(),
Expand All @@ -155,7 +144,6 @@ pub fn start(
deps.logger.unscoped(),
deps.registry,
shreds_to_insert_channel,
shreds_to_insert_signal,
shred_tracker,
deps.shred_inserter,
deps.epoch_context_mgr.slotLeaders(),
Expand All @@ -172,7 +160,6 @@ pub fn start(
.epoch_context_mgr = deps.epoch_context_mgr,
.gossip_table_rw = deps.gossip_table_rw,
.receiver = retransmit_channel,
.signal = retransmit_signal,
.maybe_num_retransmit_threads = deps.n_retransmit_threads,
.overwrite_stake_for_testing = deps.overwrite_turbine_stake_for_testing,
.exit = deps.exit,
Expand Down
3 changes: 1 addition & 2 deletions src/shred_network/shred_processor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub fn runShredProcessor(
registry: *Registry(.{}),
// shred verifier --> me
verified_shred_receiver: *Channel(Packet),
verified_shred_signal: *Channel(Packet).SendSignal,
tracker: *BasicShredTracker,
shred_inserter_: ShredInserter,
leader_schedule: sig.core.leader_schedule.SlotLeaders,
Expand All @@ -44,7 +43,7 @@ pub fn runShredProcessor(
const metrics = try registry.initStruct(Metrics);

while (true) {
verified_shred_signal.wait(.{ .unordered = exit }) catch |e| switch (e) {
verified_shred_receiver.wait(.{ .unordered = exit }) catch |e| switch (e) {
error.Exit => if (verified_shred_receiver.isEmpty()) break,
};

Expand Down
30 changes: 24 additions & 6 deletions src/shred_network/shred_receiver.zig
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,35 @@ pub const ShredReceiver = struct {

const Self = @This();

/// A shared instance of an event to support waiting on multiple Channels.
const ReceiverSignal = struct {
event: std.Thread.ResetEvent = .{},
hook: Channel(Packet).SendHook = .{ .after_send = afterSend },

fn afterSend(hook: *Channel(Packet).SendHook, _: *Channel(Packet)) void {
const signal: *ReceiverSignal = @alignCast(@fieldParentPtr("hook", hook));
signal.event.set();
}

fn wait(signal: *ReceiverSignal, exit: ExitCondition) error{Exit}!void {
while (true) {
signal.event.timedWait(1 * std.time.ns_per_s) catch {};
if (exit.shouldExit()) return error.Exit;
if (signal.event.isSet()) return signal.event.reset();
}
}
};

/// Run threads to listen/send over socket and handle all incoming packets.
/// Returns when exit is set to true.
pub fn run(self: *Self) !void {
defer self.logger.err().log("exiting shred receiver");
errdefer self.logger.err().log("error in shred receiver");

var receive_signal: Channel(Packet).SendSignal = .{};
var receive_signal = ReceiverSignal{};
const exit = ExitCondition{ .unordered = self.exit };

// Cretae pipe from response_sender (SocketPipe overrides .send_hook) -> repair_socket
// Cretae pipe from response_sender -> repair_socket
const response_sender = try Channel(Packet).create(self.allocator);
defer response_sender.destroy();

Expand All @@ -65,11 +84,11 @@ pub const ShredReceiver = struct {
);
defer response_sender_pipe.deinit(self.allocator);

// Create pipe from repair_socket -> response_receiver (SendSignal overrides .send_hook)
// Create pipe from repair_socket -> response_receiver.
const response_receiver = try Channel(Packet).create(self.allocator);
response_receiver.send_hook = &receive_signal.hook;
defer response_receiver.destroy();

const response_receiver_pipe = try SocketPipe.init(
self.allocator,
.receiver,
Expand All @@ -80,14 +99,13 @@ pub const ShredReceiver = struct {
);
defer response_receiver_pipe.deinit(self.allocator);

// Create N pipes from turbine_socket -> turbine_receiver (SendSignal overrides .send_hook)
// Create N pipes from turbine_socket -> turbine_receiver.
const turbine_receiver = try Channel(Packet).create(self.allocator);
turbine_receiver.send_hook = &receive_signal.hook;
defer turbine_receiver.destroy();

var turbine_receiver_pipes: std.BoundedArray(*SocketPipe, NUM_TVU_RECEIVERS) = .{};
defer for (turbine_receiver_pipes.slice()) |pipe| pipe.deinit(self.allocator);

for (0..NUM_TVU_RECEIVERS) |_| {
try turbine_receiver_pipes.append(try SocketPipe.init(
self.allocator,
Expand Down
7 changes: 2 additions & 5 deletions src/shred_network/shred_retransmitter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub fn runShredRetransmitter(params: struct {
epoch_context_mgr: *EpochContextManager,
gossip_table_rw: *RwMux(sig.gossip.GossipTable),
receiver: *Channel(Packet),
signal: *Channel(Packet).SendSignal,
maybe_num_retransmit_threads: ?usize,
overwrite_stake_for_testing: bool,
exit: *AtomicBool,
Expand Down Expand Up @@ -90,7 +89,6 @@ pub fn runShredRetransmitter(params: struct {
params.my_contact_info,
params.epoch_context_mgr,
params.receiver,
params.signal,
&receive_to_retransmit_channel,
params.gossip_table_rw,
params.rand,
Expand Down Expand Up @@ -135,7 +133,6 @@ fn receiveShreds(
my_contact_info: ThreadSafeContactInfo,
epoch_context_mgr: *EpochContextManager,
receiver: *Channel(Packet),
signal: *Channel(Packet).SendSignal,
sender: *Channel(RetransmitShredInfo),
gossip_table_rw: *RwMux(sig.gossip.GossipTable),
rand: Random,
Expand All @@ -158,7 +155,7 @@ fn receiveShreds(
var receive_shreds_timer = try sig.time.Timer.start();

while (true) {
signal.wait(.{ .unordered = exit }) catch break;
receiver.wait(.{ .unordered = exit }) catch break;
receive_shreds_timer.reset();

const receiver_len = receiver.len();
Expand Down Expand Up @@ -327,7 +324,7 @@ fn retransmitShreds(
while (!exit.load(.acquire)) {
var retransmit_shred_timer = try sig.time.Timer.start();

// NOTE: multiple `retransmitShreds` run concurrently so there's no single receiver to use SendSignal with.
// NOTE: multiple `retransmitShreds` run concurrently can't use receiver.wait() here.
const retransmit_info: RetransmitShredInfo = receiver.tryReceive() orelse continue;
defer retransmit_info.turbine_tree.releaseUnsafe();

Expand Down
3 changes: 1 addition & 2 deletions src/shred_network/shred_verifier.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub fn runShredVerifier(
registry: *Registry(.{}),
/// shred receiver --> me
unverified_shred_receiver: *Channel(Packet),
unverified_shred_signal: *Channel(Packet).SendSignal,
/// me --> shred processor
verified_shred_sender: *Channel(Packet),
/// me --> retransmit service
Expand All @@ -31,7 +30,7 @@ pub fn runShredVerifier(
const metrics = try registry.initStruct(Metrics);
var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024);
while (true) {
unverified_shred_signal.wait(.{ .unordered = exit }) catch |e| switch (e) {
unverified_shred_receiver.wait(.{ .unordered = exit }) catch |e| switch (e) {
error.Exit => if (unverified_shred_receiver.isEmpty()) break,
};

Expand Down
Loading

0 comments on commit 19c2493

Please sign in to comment.