diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 5603d8784..b6fea4c4a 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -153,10 +153,8 @@ pub const GossipService = struct { outgoing_pipe: ?*SocketPipe = null, /// communication between threads - packet_incoming_signal: *Channel(Packet).SendSignal, packet_incoming_channel: *Channel(Packet), packet_outgoing_channel: *Channel(Packet), - verified_incoming_signal: *Channel(GossipMessageWithEndpoint).SendSignal, verified_incoming_channel: *Channel(GossipMessageWithEndpoint), /// table to store gossip values @@ -232,15 +230,6 @@ pub const GossipService = struct { var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator); errdefer verified_incoming_channel.destroy(); - // setup signals - const packet_incoming_signal = try Channel(Packet).SendSignal.create(allocator); - packet_incoming_channel.send_hook = &packet_incoming_signal.hook; - errdefer packet_incoming_signal.destroy(allocator); - - const verified_incoming_signal = try Channel(GossipMessageWithEndpoint).SendSignal.create(allocator); - verified_incoming_channel.send_hook = &verified_incoming_signal.hook; - errdefer verified_incoming_signal.destroy(allocator); - // setup the socket (bind with read-timeout) const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified; var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed; @@ -307,10 +296,8 @@ pub const GossipService = struct { .my_pubkey = my_pubkey, .my_shred_version = Atomic(u16).init(my_shred_version), .gossip_socket = gossip_socket, - .packet_incoming_signal = packet_incoming_signal, .packet_incoming_channel = packet_incoming_channel, .packet_outgoing_channel = packet_outgoing_channel, - .verified_incoming_signal = verified_incoming_signal, .verified_incoming_channel = verified_incoming_channel, .gossip_table_rw = RwMux(GossipTable).init(gossip_table), .push_msg_queue_mux = Mux(ArrayList(GossipData)).init(ArrayList(GossipData).init(allocator)), @@ -353,14 +340,12 @@ pub const GossipService = struct { // everything should be cleaned up when the thread-pool joins. std.debug.assert(self.packet_incoming_channel.isEmpty()); self.packet_incoming_channel.destroy(); - self.packet_incoming_signal.destroy(self.allocator); std.debug.assert(self.packet_outgoing_channel.isEmpty()); self.packet_outgoing_channel.destroy(); std.debug.assert(self.verified_incoming_channel.isEmpty()); self.verified_incoming_channel.destroy(); - self.verified_incoming_signal.destroy(self.allocator); self.gossip_socket.close(); @@ -540,7 +525,7 @@ pub const GossipService = struct { // loop until the previous service closes and triggers us to close while (true) { - self.packet_incoming_signal.wait(exit_condition) catch break; + self.packet_incoming_channel.wait(exit_condition) catch break; // verify in parallel using the threadpool // PERF: investigate CPU pinning @@ -637,7 +622,7 @@ pub const GossipService = struct { // - `exit` isn't set, // - there isn't any data to process in the input channel, in order to block the join until we've finished while (true) { - self.verified_incoming_signal.wait(exit_condition) catch break; + self.verified_incoming_channel.wait(exit_condition) catch break; var msg_count: usize = 0; while (self.verified_incoming_channel.tryReceive()) |message| { diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 6e5088a3f..f7153d9c2 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -20,7 +20,6 @@ const LOG_SCOPE: []const u8 = "socket_utils"; pub const SocketPipe = struct { handle: std.Thread, - outgoing_signal: Channel(Packet).SendSignal, const Self = @This(); @@ -41,29 +40,17 @@ pub const SocketPipe = struct { const self = try allocator.create(Self); errdefer allocator.destroy(self); - switch (direction) { - .sender => { - self.outgoing_signal = .{}; - channel.send_hook = &self.outgoing_signal.hook; - self.handle = try std.Thread.spawn( - .{}, - runSender, - .{ self, logger, socket, channel, exit }, - ); + self.* = .{ + .handle = switch (direction) { + .sender => try std.Thread.spawn(.{}, runSend, .{ logger, socket, channel, exit }), + .receiver => try std.Thread.spawn(.{}, runRecv, .{ logger, socket, channel, exit }), }, - .receiver => { - self.handle = try std.Thread.spawn( - .{}, - runReceiver, - .{ logger, socket, channel, exit }, - ); - }, - } + }; return self; } - fn runReceiver( + fn runRecv( logger_: Logger, socket_: UdpSocket, incoming_channel: *Channel(Packet), @@ -96,8 +83,7 @@ pub const SocketPipe = struct { } } - fn runSender( - self: *Self, + fn runSend( logger_: Logger, socket: UdpSocket, outgoing_channel: *Channel(Packet), @@ -112,7 +98,7 @@ pub const SocketPipe = struct { } while (true) { - self.outgoing_signal.wait(exit) catch break; + outgoing_channel.wait(exit) catch break; while (outgoing_channel.tryReceive()) |p| { if (exit.shouldExit()) break; // drop the rest (like above) if exit prematurely. const bytes_sent = socket.sendTo(p.addr, p.data[0..p.size]) catch |e| { @@ -164,9 +150,6 @@ pub const BenchmarkPacketProcessing = struct { var incoming_channel = try Channel(Packet).init(allocator); defer incoming_channel.deinit(); - var incoming_signal: Channel(Packet).SendSignal = .{}; - incoming_channel.send_hook = &incoming_signal.hook; - const incoming_pipe = try SocketPipe .init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition); defer incoming_pipe.deinit(allocator); @@ -219,7 +202,7 @@ pub const BenchmarkPacketProcessing = struct { var packets_to_recv = n_packets; var timer = try sig.time.Timer.start(); while (packets_to_recv > 0) { - incoming_signal.wait(exit_condition) catch break; + incoming_channel.wait(exit_condition) catch break; while (incoming_channel.tryReceive()) |_| { packets_to_recv -|= 1; } diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index 3c47c4b3b..33400f683 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -92,16 +92,6 @@ pub fn start( const retransmit_channel = try Channel(Packet).create(deps.allocator); try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); - // signals (used to wait for channel senders) - const unverified_shred_signal = try Channel(Packet).SendSignal.create(arena); - unverified_shred_channel.send_hook = &unverified_shred_signal.hook; - - const shreds_to_insert_signal = try Channel(Packet).SendSignal.create(arena); - shreds_to_insert_channel.send_hook = &shreds_to_insert_signal.hook; - - const retransmit_signal = try Channel(Packet).SendSignal.create(arena); - retransmit_channel.send_hook = &retransmit_signal.hook; - // receiver (threads) const shred_receiver = try arena.create(ShredReceiver); shred_receiver.* = .{ @@ -130,7 +120,6 @@ pub fn start( deps.exit, deps.registry, unverified_shred_channel, - unverified_shred_signal, shreds_to_insert_channel, retransmit_channel, deps.epoch_context_mgr.slotLeaders(), @@ -155,7 +144,6 @@ pub fn start( deps.logger.unscoped(), deps.registry, shreds_to_insert_channel, - shreds_to_insert_signal, shred_tracker, deps.shred_inserter, deps.epoch_context_mgr.slotLeaders(), @@ -172,7 +160,6 @@ pub fn start( .epoch_context_mgr = deps.epoch_context_mgr, .gossip_table_rw = deps.gossip_table_rw, .receiver = retransmit_channel, - .signal = retransmit_signal, .maybe_num_retransmit_threads = deps.n_retransmit_threads, .overwrite_stake_for_testing = deps.overwrite_turbine_stake_for_testing, .exit = deps.exit, diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig index 0e93903a9..e0c9da8ba 100644 --- a/src/shred_network/shred_processor.zig +++ b/src/shred_network/shred_processor.zig @@ -31,7 +31,6 @@ pub fn runShredProcessor( registry: *Registry(.{}), // shred verifier --> me verified_shred_receiver: *Channel(Packet), - verified_shred_signal: *Channel(Packet).SendSignal, tracker: *BasicShredTracker, shred_inserter_: ShredInserter, leader_schedule: sig.core.leader_schedule.SlotLeaders, @@ -44,7 +43,7 @@ pub fn runShredProcessor( const metrics = try registry.initStruct(Metrics); while (true) { - verified_shred_signal.wait(.{ .unordered = exit }) catch |e| switch (e) { + verified_shred_receiver.wait(.{ .unordered = exit }) catch |e| switch (e) { error.Exit => if (verified_shred_receiver.isEmpty()) break, }; diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 8d41b8f9c..cff7d4a28 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -42,16 +42,35 @@ pub const ShredReceiver = struct { const Self = @This(); + /// A shared instance of an event to support waiting on multiple Channels. + const ReceiverSignal = struct { + event: std.Thread.ResetEvent = .{}, + hook: Channel(Packet).SendHook = .{ .after_send = afterSend }, + + fn afterSend(hook: *Channel(Packet).SendHook, _: *Channel(Packet)) void { + const signal: *ReceiverSignal = @alignCast(@fieldParentPtr("hook", hook)); + signal.event.set(); + } + + fn wait(signal: *ReceiverSignal, exit: ExitCondition) error{Exit}!void { + while (true) { + signal.event.timedWait(1 * std.time.ns_per_s) catch {}; + if (exit.shouldExit()) return error.Exit; + if (signal.event.isSet()) return signal.event.reset(); + } + } + }; + /// Run threads to listen/send over socket and handle all incoming packets. /// Returns when exit is set to true. pub fn run(self: *Self) !void { defer self.logger.err().log("exiting shred receiver"); errdefer self.logger.err().log("error in shred receiver"); - var receive_signal: Channel(Packet).SendSignal = .{}; + var receive_signal = ReceiverSignal{}; const exit = ExitCondition{ .unordered = self.exit }; - // Cretae pipe from response_sender (SocketPipe overrides .send_hook) -> repair_socket + // Cretae pipe from response_sender -> repair_socket const response_sender = try Channel(Packet).create(self.allocator); defer response_sender.destroy(); @@ -65,11 +84,11 @@ pub const ShredReceiver = struct { ); defer response_sender_pipe.deinit(self.allocator); - // Create pipe from repair_socket -> response_receiver (SendSignal overrides .send_hook) + // Create pipe from repair_socket -> response_receiver. const response_receiver = try Channel(Packet).create(self.allocator); response_receiver.send_hook = &receive_signal.hook; defer response_receiver.destroy(); - + const response_receiver_pipe = try SocketPipe.init( self.allocator, .receiver, @@ -80,14 +99,13 @@ pub const ShredReceiver = struct { ); defer response_receiver_pipe.deinit(self.allocator); - // Create N pipes from turbine_socket -> turbine_receiver (SendSignal overrides .send_hook) + // Create N pipes from turbine_socket -> turbine_receiver. const turbine_receiver = try Channel(Packet).create(self.allocator); turbine_receiver.send_hook = &receive_signal.hook; defer turbine_receiver.destroy(); var turbine_receiver_pipes: std.BoundedArray(*SocketPipe, NUM_TVU_RECEIVERS) = .{}; defer for (turbine_receiver_pipes.slice()) |pipe| pipe.deinit(self.allocator); - for (0..NUM_TVU_RECEIVERS) |_| { try turbine_receiver_pipes.append(try SocketPipe.init( self.allocator, diff --git a/src/shred_network/shred_retransmitter.zig b/src/shred_network/shred_retransmitter.zig index a9beef929..f4e02f888 100644 --- a/src/shred_network/shred_retransmitter.zig +++ b/src/shred_network/shred_retransmitter.zig @@ -50,7 +50,6 @@ pub fn runShredRetransmitter(params: struct { epoch_context_mgr: *EpochContextManager, gossip_table_rw: *RwMux(sig.gossip.GossipTable), receiver: *Channel(Packet), - signal: *Channel(Packet).SendSignal, maybe_num_retransmit_threads: ?usize, overwrite_stake_for_testing: bool, exit: *AtomicBool, @@ -90,7 +89,6 @@ pub fn runShredRetransmitter(params: struct { params.my_contact_info, params.epoch_context_mgr, params.receiver, - params.signal, &receive_to_retransmit_channel, params.gossip_table_rw, params.rand, @@ -135,7 +133,6 @@ fn receiveShreds( my_contact_info: ThreadSafeContactInfo, epoch_context_mgr: *EpochContextManager, receiver: *Channel(Packet), - signal: *Channel(Packet).SendSignal, sender: *Channel(RetransmitShredInfo), gossip_table_rw: *RwMux(sig.gossip.GossipTable), rand: Random, @@ -158,7 +155,7 @@ fn receiveShreds( var receive_shreds_timer = try sig.time.Timer.start(); while (true) { - signal.wait(.{ .unordered = exit }) catch break; + receiver.wait(.{ .unordered = exit }) catch break; receive_shreds_timer.reset(); const receiver_len = receiver.len(); @@ -327,7 +324,7 @@ fn retransmitShreds( while (!exit.load(.acquire)) { var retransmit_shred_timer = try sig.time.Timer.start(); - // NOTE: multiple `retransmitShreds` run concurrently so there's no single receiver to use SendSignal with. + // NOTE: multiple `retransmitShreds` run concurrently can't use receiver.wait() here. const retransmit_info: RetransmitShredInfo = receiver.tryReceive() orelse continue; defer retransmit_info.turbine_tree.releaseUnsafe(); diff --git a/src/shred_network/shred_verifier.zig b/src/shred_network/shred_verifier.zig index 25764cbb4..d18633242 100644 --- a/src/shred_network/shred_verifier.zig +++ b/src/shred_network/shred_verifier.zig @@ -21,7 +21,6 @@ pub fn runShredVerifier( registry: *Registry(.{}), /// shred receiver --> me unverified_shred_receiver: *Channel(Packet), - unverified_shred_signal: *Channel(Packet).SendSignal, /// me --> shred processor verified_shred_sender: *Channel(Packet), /// me --> retransmit service @@ -31,7 +30,7 @@ pub fn runShredVerifier( const metrics = try registry.initStruct(Metrics); var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024); while (true) { - unverified_shred_signal.wait(.{ .unordered = exit }) catch |e| switch (e) { + unverified_shred_receiver.wait(.{ .unordered = exit }) catch |e| switch (e) { error.Exit => if (unverified_shred_receiver.isEmpty()) break, }; diff --git a/src/sync/channel.zig b/src/sync/channel.zig index 96c32e109..e62353a95 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -13,6 +13,7 @@ pub fn Channel(T: type) type { tail: Position, closed: Atomic(bool) = Atomic(bool).init(false), allocator: Allocator, + event: std.Thread.ResetEvent = .{}, send_hook: ?*SendHook = null, pub const SendHook = struct { @@ -28,34 +29,6 @@ pub fn Channel(T: type) type { } }; - pub const SendSignal = struct { - event: std.Thread.ResetEvent = .{}, - hook: SendHook = .{ .after_send = afterSend }, - - pub fn create(allocator: Allocator) !*SendSignal { - const self = try allocator.create(SendSignal); - self.* = .{}; - return self; - } - - pub fn destroy(self: *SendSignal, allocator: Allocator) void { - allocator.destroy(self); - } - - fn afterSend(hook: *SendHook, _: *Self) void { - const self: *@This() = @alignCast(@fieldParentPtr("hook", hook)); - self.event.set(); - } - - pub fn wait(self: *SendSignal, exit: ExitCondition) error{Exit}!void { - while (true) { - self.event.timedWait(1 * std.time.ns_per_s) catch {}; - if (exit.shouldExit()) return error.Exit; - if (self.event.isSet()) return self.event.reset(); - } - } - }; - const Self = @This(); const BLOCK_CAP = 31; const SHIFT = 1; @@ -245,6 +218,8 @@ pub fn Channel(T: type) type { // to read the data we've just assigned. _ = slot.state.fetchOr(WRITTEN_TO, .release); + channel.event.set(); + if (send_hook) |hook| hook.after_send(hook, channel); return; @@ -252,6 +227,16 @@ pub fn Channel(T: type) type { } } + /// Waits untli the channel potentially has items, periodically checking for the ExitCondition. + /// Must be called by only one receiver thread at a time. + pub fn wait(channel: *Self, exit: ExitCondition) error{Exit}!void { + while (true) { + channel.event.timedWait(1 * std.time.ns_per_s) catch {}; + if (exit.shouldExit()) return error.Exit; + if (channel.event.isSet()) return channel.event.reset(); + } + } + /// Attempt to receive an item, returning immediately. /// Returns null if the channel is empty. pub fn tryReceive(channel: *Self) ?T { diff --git a/src/trace/log.zig b/src/trace/log.zig index ff66d608c..b9107b4e9 100644 --- a/src/trace/log.zig +++ b/src/trace/log.zig @@ -121,7 +121,6 @@ pub const ChannelPrintLogger = struct { log_allocator_state: *RecycleFBA(.{}), max_buffer: u64, channel: Channel([]const u8), - signal: Channel([]const u8).SendSignal, handle: ?std.Thread, const Self = @This(); @@ -144,10 +143,8 @@ pub const ChannelPrintLogger = struct { .max_level = config.max_level, .handle = null, .channel = try Channel([]const u8).init(config.allocator), - .signal = .{}, }; - self.channel.send_hook = &self.signal.hook; self.handle = try std.Thread.spawn(.{}, run, .{self}); return self; } @@ -175,7 +172,7 @@ pub const ChannelPrintLogger = struct { pub fn run(self: *Self) void { while (true) { - self.signal.wait(.{ .unordered = &self.exit }) catch break; + self.channel.wait(.{ .unordered = &self.exit }) catch break; while (self.channel.tryReceive()) |message| { defer self.log_allocator.free(message); diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index 5406c64bc..09cd2fe1b 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -47,7 +47,6 @@ pub const Service = struct { send_channel: *Channel(Packet), /// Put transactions onto this channel to send them. input_channel: *Channel(TransactionInfo), - input_signal: *Channel(TransactionInfo).SendSignal, exit: *AtomicBool, logger: ScopedLogger(@typeName(Self)), @@ -65,10 +64,6 @@ pub const Service = struct { const send_channel = try Channel(Packet).create(allocator); errdefer send_channel.destroy(); - const input_signal = try allocator.create(Channel(TransactionInfo).SendSignal); - input_signal.* = .{}; - input_channel.send_hook = &input_signal.hook; - return .{ .allocator = allocator, .config = config, @@ -86,7 +81,6 @@ pub const Service = struct { )), .send_channel = send_channel, .input_channel = input_channel, - .input_signal = input_signal, .logger = logger.withScope(@typeName(Self)), .exit = exit, }; @@ -130,7 +124,7 @@ pub const Service = struct { defer transaction_batch.deinit(); while (true) { - self.input_signal.wait(.{ .unordered = self.exit }) catch |e| switch (e) { + self.input_channel.wait(.{ .unordered = self.exit }) catch |e| switch (e) { error.Exit => if (self.input_channel.isEmpty()) break, };