From 0e8853c052f89093c2055b6c2f682ae8e8e82c41 Mon Sep 17 00:00:00 2001 From: protty <45520026+kprotty@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:01:56 -0600 Subject: [PATCH] perf(channel): hooks to avoid spinning wait (#486) * wip: using ChannelHooks and SocketPIpe * fixing SocketPipe usage * more SendSignal uses + fix Channel lifetimes * fix some hooks & exit conditions * style guide + turbine recv pipe cleanup * fix SendSignal lifetimes/initialization * embed ResetEvent in Channel replaces SendSignal * zig fmt * fix style * review fixes * channel.wait -> waitToReceive & check isEmpty * rename SocketPIpe to SocketThread - `init{Sender/Receiver}` -> `spawn{Sender/Receiver}` - `deinit(allocator)` -> `join()` also fixes Channel "send hook" tests after beforeSend was removed * remove NUM_TVU_RECEIVERS constant the number of turbine receivers was reduced to 1. Discussion: https://github.com/Syndica/sig/pull/486#discussion_r1917123358 * simplify turbine/repair socket thread handling * use channel wait in geyser + fix retransmitShreds comment --- src/cmd/cmd.zig | 2 +- src/geyser/core.zig | 4 +- src/geyser/main.zig | 4 +- src/gossip/service.zig | 63 +++-- src/net/socket_utils.zig | 308 ++++++++++++---------- src/shred_network/repair_service.zig | 24 +- src/shred_network/service.zig | 4 +- src/shred_network/shred_processor.zig | 6 +- src/shred_network/shred_receiver.zig | 103 ++++---- src/shred_network/shred_retransmitter.zig | 26 +- src/shred_network/shred_verifier.zig | 6 +- src/sync/channel.zig | 146 +++++----- src/trace/log.zig | 12 +- src/transaction_sender/service.zig | 11 +- 14 files changed, 382 insertions(+), 337 deletions(-) diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index d692c0a82..b763b1637 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -1185,7 +1185,7 @@ pub fn testTransactionSenderService() !void { // setup channel for communication to the tx-sender service const transaction_channel = try sig.sync.Channel(sig.transaction_sender.TransactionInfo).create(allocator); - defer transaction_channel.deinit(); + defer transaction_channel.destroy(); // this handles transactions and forwards them to leaders TPU ports var transaction_sender_service = try sig.transaction_sender.Service.init( diff --git a/src/geyser/core.zig b/src/geyser/core.zig index 060b406b7..00b125b9f 100644 --- a/src/geyser/core.zig +++ b/src/geyser/core.zig @@ -153,7 +153,9 @@ pub const GeyserWriter = struct { } pub fn IOStreamLoop(self: *Self) !void { - while (!self.exit.load(.acquire)) { + while (true) { + self.io_channel.waitToReceive(.{ .unordered = self.exit }) catch break; + while (self.io_channel.tryReceive()) |payload| { _ = self.writeToPipe(payload) catch |err| { if (err == WritePipeError.PipeBlockedWithExitSignaled) { diff --git a/src/geyser/main.zig b/src/geyser/main.zig index 465f82e30..19a1f2822 100644 --- a/src/geyser/main.zig +++ b/src/geyser/main.zig @@ -311,7 +311,9 @@ pub fn csvDumpIOWriter( var timer = try sig.time.Timer.start(); errdefer exit.store(true, .monotonic); - while (!exit.load(.monotonic)) { + while (true) { + io_channel.waitToReceive(.{ .unordered = exit }) catch break; + while (io_channel.tryReceive()) |csv_row| { // write to file try csv_file.writeAll(csv_row); diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 2fd4c422b..aaccda237 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -53,6 +53,7 @@ const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr; const ServiceManager = sig.utils.service_manager.ServiceManager; const Duration = sig.time.Duration; const ExitCondition = sig.sync.ExitCondition; +const SocketThread = sig.net.SocketThread; const endpointToString = sig.net.endpointToString; const globalRegistry = sig.prometheus.globalRegistry; @@ -104,7 +105,7 @@ const GOSSIP_PRNG_SEED = 19; /// The flow of data goes as follows: /// -/// `readSocket` -> +/// `SocketThread.initReceiver` -> /// - reads from the gossip socket /// - puts the new packet onto `packet_incoming_channel` /// - repeat until exit @@ -120,14 +121,14 @@ const GOSSIP_PRNG_SEED = 19; /// - processes the verified message it has received /// - depending on the type of message received, it may put something onto `packet_outgoing_channel` /// -/// `sendSocket` -> +/// `SocketThread.initSender` -> /// - receives from `packet_outgoing_channel` /// - sends the outgoing packet onto the gossip socket /// - repeats while `exit` is false and `packet_outgoing_channel` -/// - when `sendSocket` sees that `exit` has become `true`, it will begin waiting on +/// - when `SocketThread` sees that `exit` has become `true`, it will begin waiting on /// the previous thing in the chain to close, that usually being `processMessages`. /// this ensures that `processMessages` doesn't add new items to `packet_outgoing_channel` -/// after the `sendSocket` thread exits. +/// after the `SocketThread` exits. /// pub const GossipService = struct { /// used for general allocation purposes @@ -148,6 +149,11 @@ pub const GossipService = struct { /// Indicates if the gossip service is closed. closed: bool, + /// Piping data between the gossip_socket and the channels. + /// Set to null until start() is called as they represent threads. + incoming_socket_thread: ?*SocketThread = null, + outgoing_socket_thread: ?*SocketThread = null, + /// communication between threads packet_incoming_channel: *Channel(Packet), packet_outgoing_channel: *Channel(Packet), @@ -218,13 +224,13 @@ pub const GossipService = struct { // setup channels for communication between threads var packet_incoming_channel = try Channel(Packet).create(allocator); - errdefer packet_incoming_channel.deinit(); + errdefer packet_incoming_channel.destroy(); var packet_outgoing_channel = try Channel(Packet).create(allocator); - errdefer packet_outgoing_channel.deinit(); + errdefer packet_outgoing_channel.destroy(); var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator); - errdefer verified_incoming_channel.deinit(); + errdefer verified_incoming_channel.destroy(); // setup the socket (bind with read-timeout) const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified; @@ -328,19 +334,20 @@ pub const GossipService = struct { // wait for all threads to shutdown correctly self.service_manager.deinit(); + // Wait for pipes to shutdown if any + if (self.incoming_socket_thread) |thread| thread.join(); + if (self.outgoing_socket_thread) |thread| thread.join(); + // assert the channels are empty in order to make sure no data was lost. // everything should be cleaned up when the thread-pool joins. - std.debug.assert(self.packet_incoming_channel.len() == 0); - self.packet_incoming_channel.deinit(); - self.allocator.destroy(self.packet_incoming_channel); + std.debug.assert(self.packet_incoming_channel.isEmpty()); + self.packet_incoming_channel.destroy(); - std.debug.assert(self.packet_outgoing_channel.len() == 0); - self.packet_outgoing_channel.deinit(); - self.allocator.destroy(self.packet_outgoing_channel); + std.debug.assert(self.packet_outgoing_channel.isEmpty()); + self.packet_outgoing_channel.destroy(); - std.debug.assert(self.verified_incoming_channel.len() == 0); - self.verified_incoming_channel.deinit(); - self.allocator.destroy(self.verified_incoming_channel); + std.debug.assert(self.verified_incoming_channel.isEmpty()); + self.verified_incoming_channel.destroy(); self.gossip_socket.close(); @@ -387,7 +394,7 @@ pub const GossipService = struct { pub fn start( self: *Self, params: RunThreadsParams, - ) (std.mem.Allocator.Error || std.Thread.SpawnError)!void { + ) !void { // NOTE: this is stack copied on each spawn() call below so we can modify it without // affecting other threads var exit_condition = sig.sync.ExitCondition{ @@ -397,12 +404,13 @@ pub const GossipService = struct { }, }; - try self.service_manager.spawn("[gossip] readSocket", socket_utils.readSocket, .{ + self.incoming_socket_thread = try SocketThread.spawnReceiver( + self.allocator, + self.logger.unscoped(), self.gossip_socket, self.packet_incoming_channel, - self.logger.unscoped(), exit_condition, - }); + ); exit_condition.ordered.exit_index += 1; try self.service_manager.spawn("[gossip] verifyPackets", verifyPackets, .{ @@ -427,12 +435,13 @@ pub const GossipService = struct { exit_condition.ordered.exit_index += 1; } - try self.service_manager.spawn("[gossip] sendSocket", socket_utils.sendSocket, .{ + self.outgoing_socket_thread = try SocketThread.spawnSender( + self.allocator, + self.logger.unscoped(), self.gossip_socket, self.packet_outgoing_channel, - self.logger.unscoped(), exit_condition, - }); + ); exit_condition.ordered.exit_index += 1; if (params.dump) { @@ -514,7 +523,9 @@ pub const GossipService = struct { } // loop until the previous service closes and triggers us to close - while (exit_condition.shouldRun()) { + while (true) { + self.packet_incoming_channel.waitToReceive(exit_condition) catch break; + // verify in parallel using the threadpool // PERF: investigate CPU pinning var task_search_start_idx: usize = 0; @@ -609,7 +620,9 @@ pub const GossipService = struct { // keep waiting for new data until, // - `exit` isn't set, // - there isn't any data to process in the input channel, in order to block the join until we've finished - while (exit_condition.shouldRun()) { + while (true) { + self.verified_incoming_channel.waitToReceive(exit_condition) catch break; + var msg_count: usize = 0; while (self.verified_incoming_channel.tryReceive()) |message| { msg_count += 1; diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 586f53585..afd2a9e86 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -1,134 +1,131 @@ const std = @import("std"); const builtin = @import("builtin"); +const sig = @import("../sig.zig"); +const network = @import("zig-network"); + const Allocator = std.mem.Allocator; -const Atomic = std.atomic.Value; +const UdpSocket = network.Socket; -const sig = @import("../sig.zig"); const Packet = sig.net.Packet; const PACKET_DATA_SIZE = sig.net.PACKET_DATA_SIZE; const Channel = sig.sync.Channel; const Logger = sig.trace.Logger; const ExitCondition = sig.sync.ExitCondition; -const UdpSocket = @import("zig-network").Socket; - pub const SOCKET_TIMEOUT_US: usize = 1 * std.time.us_per_s; pub const PACKETS_PER_BATCH: usize = 64; // The identifier for the scoped logger used in this file. const LOG_SCOPE: []const u8 = "socket_utils"; -pub fn readSocket( - socket_: UdpSocket, - incoming_channel: *Channel(Packet), - logger_: Logger, - exit: ExitCondition, -) !void { - const logger = logger_.withScope(LOG_SCOPE); - defer { - exit.afterExit(); - logger.info().log("readSocket loop closed"); - } - - // NOTE: we set to non-blocking to periodically check if we should exit - var socket = socket_; - try socket.setReadTimeout(SOCKET_TIMEOUT_US); - - while (exit.shouldRun()) { - var packet: Packet = Packet.default(); - const recv_meta = socket.receiveFrom(&packet.data) catch |err| switch (err) { - error.WouldBlock => continue, - else => |e| { - logger.err().logf("readSocket error: {s}", .{@errorName(e)}); - return e; - }, - }; - const bytes_read = recv_meta.numberOfBytes; - if (bytes_read == 0) return error.SocketClosed; - packet.addr = recv_meta.sender; - packet.size = bytes_read; - try incoming_channel.send(packet); - } -} - -pub fn sendSocket( - socket: UdpSocket, - outgoing_channel: *Channel(Packet), - logger_: Logger, - exit: ExitCondition, -) !void { - const logger = logger_.withScope(LOG_SCOPE); - defer { - // empty the channel - while (outgoing_channel.tryReceive()) |_| {} - exit.afterExit(); - logger.debug().log("sendSocket loop closed"); - } - - while (exit.shouldRun()) { - while (outgoing_channel.tryReceive()) |p| { - const bytes_sent = socket.sendTo(p.addr, p.data[0..p.size]) catch |e| { - logger.err().logf("sendSocket error: {s}", .{@errorName(e)}); - continue; - }; - std.debug.assert(bytes_sent == p.size); - } - } -} - -/// A thread that is dedicated to either sending or receiving data over a socket. -/// The included channel can be used communicate with that thread. -/// -/// The channel only supports one: either sending or receiving, depending how it -/// was initialized. While you *could* send data to the channel for a "receiver" -/// socket, the underlying thread won't actually read the data from the channel. pub const SocketThread = struct { - channel: *Channel(Packet), - exit: *Atomic(bool), + allocator: Allocator, handle: std.Thread, - const Self = @This(); + pub fn spawnSender( + allocator: Allocator, + logger: Logger, + socket: UdpSocket, + outgoing_channel: *Channel(Packet), + exit: ExitCondition, + ) !*SocketThread { + return spawn(allocator, logger, socket, outgoing_channel, exit, runSender); + } - pub fn initSender( + pub fn spawnReceiver( allocator: Allocator, logger: Logger, socket: UdpSocket, - exit: *Atomic(bool), - ) !Self { - const channel = try Channel(Packet).create(allocator); - return .{ - .channel = channel, - .exit = exit, - .handle = try std.Thread.spawn( - .{}, - sendSocket, - .{ socket, channel, logger, .{ .unordered = exit } }, - ), - }; + incoming_channel: *Channel(Packet), + exit: ExitCondition, + ) !*SocketThread { + return spawn(allocator, logger, socket, incoming_channel, exit, runReceiver); } - pub fn initReceiver( + fn spawn( allocator: Allocator, logger: Logger, socket: UdpSocket, - exit: *Atomic(bool), - ) !Self { - const channel = try Channel(Packet).create(allocator); - return .{ - .channel = channel, - .exit = exit, - .handle = try std.Thread.spawn( - .{}, - readSocket, - .{ socket, channel, logger, .{ .unordered = exit } }, - ), + channel: *Channel(Packet), + exit: ExitCondition, + comptime runFn: anytype, + ) !*SocketThread { + // TODO(king): store event-loop data in SocketThread (hence, heap-alloc).. + const self = try allocator.create(SocketThread); + errdefer allocator.destroy(self); + + self.* = .{ + .allocator = allocator, + .handle = try std.Thread.spawn(.{}, runFn, .{ logger, socket, channel, exit }), }; + + return self; } - pub fn deinit(self: Self, allocator: Allocator) void { + pub fn join(self: *SocketThread) void { self.handle.join(); - self.channel.deinit(); - allocator.destroy(self.channel); + self.allocator.destroy(self); + } + + fn runReceiver( + logger_: Logger, + socket_: UdpSocket, + incoming_channel: *Channel(Packet), + exit: ExitCondition, + ) !void { + const logger = logger_.withScope(LOG_SCOPE); + defer { + exit.afterExit(); + logger.info().log("readSocket loop closed"); + } + + // NOTE: we set to non-blocking to periodically check if we should exit + var socket = socket_; + try socket.setReadTimeout(SOCKET_TIMEOUT_US); + + while (exit.shouldRun()) { + var packet: Packet = Packet.default(); + const recv_meta = socket.receiveFrom(&packet.data) catch |err| switch (err) { + error.WouldBlock => continue, + else => |e| { + logger.err().logf("readSocket error: {s}", .{@errorName(e)}); + return e; + }, + }; + const bytes_read = recv_meta.numberOfBytes; + if (bytes_read == 0) return error.SocketClosed; + packet.addr = recv_meta.sender; + packet.size = bytes_read; + try incoming_channel.send(packet); + } + } + + fn runSender( + logger_: Logger, + socket: UdpSocket, + outgoing_channel: *Channel(Packet), + exit: ExitCondition, + ) !void { + const logger = logger_.withScope(LOG_SCOPE); + defer { + // empty the channel + while (outgoing_channel.tryReceive()) |_| {} + exit.afterExit(); + logger.debug().log("sendSocket loop closed"); + } + + while (true) { + outgoing_channel.waitToReceive(exit) catch break; + + while (outgoing_channel.tryReceive()) |p| { + if (exit.shouldExit()) return; // drop the rest (like above) if exit prematurely. + const bytes_sent = socket.sendTo(p.addr, p.data[0..p.size]) catch |e| { + logger.err().logf("sendSocket error: {s}", .{@errorName(e)}); + continue; + }; + std.debug.assert(bytes_sent == p.size); + } + } } }; @@ -152,9 +149,6 @@ pub const BenchmarkPacketProcessing = struct { const n_packets = bench_args.n_packets; const allocator = if (builtin.is_test) std.testing.allocator else std.heap.c_allocator; - var channel = try Channel(Packet).init(allocator); - defer channel.deinit(); - var socket = try UdpSocket.create(.ipv4, .udp); try socket.bindToPort(0); try socket.setReadTimeout(std.time.us_per_s); // 1 second @@ -162,62 +156,86 @@ pub const BenchmarkPacketProcessing = struct { const to_endpoint = try socket.getLocalEndPoint(); var exit_flag = std.atomic.Value(bool).init(false); - const exit_condition = ExitCondition{ - .unordered = &exit_flag, + const exit_condition = ExitCondition{ .unordered = &exit_flag }; + + // Setup incoming + + var incoming_channel = try Channel(Packet).init(allocator); + defer incoming_channel.deinit(); + + const incoming_pipe = try SocketThread.spawnReceiver( + allocator, + .noop, + socket, + &incoming_channel, + exit_condition, + ); + defer incoming_pipe.join(); + + // Start outgoing + + const S = struct { + fn sender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void { + var i: usize = 0; + var packet: Packet = undefined; + var prng = std.rand.DefaultPrng.init(0); + var timer = try std.time.Timer.start(); + + while (e.shouldRun()) { + prng.fill(&packet.data); + packet.addr = addr; + packet.size = PACKET_DATA_SIZE; + try channel.send(packet); + + // 10Kb per second, until one second + // each packet is 1k bytes + // = 10 packets per second + i += 1; + if (i % 10 == 0) { + const elapsed = timer.read(); + if (elapsed < std.time.ns_per_s) { + std.time.sleep(std.time.ns_per_s); + } + } + } + } }; - var handle = try std.Thread.spawn( - .{}, - readSocket, - .{ socket, &channel, .noop, exit_condition }, + + var outgoing_channel = try Channel(Packet).init(allocator); + defer outgoing_channel.deinit(); + + const outgoing_pipe = try SocketThread.spawnSender( + allocator, + .noop, + socket, + &outgoing_channel, + exit_condition, ); - defer { - exit_condition.setExit(); - handle.join(); - } - var recv_handle = try std.Thread.spawn( + defer outgoing_pipe.join(); + + const outgoing_handle = try std.Thread.spawn( .{}, - benchmarkChannelRecv, - .{ &channel, n_packets }, + S.sender, + .{ &outgoing_channel, to_endpoint, exit_condition }, ); + defer outgoing_handle.join(); - var prng = std.rand.DefaultPrng.init(0); - var packet_buf: [PACKET_DATA_SIZE]u8 = undefined; - var timer = try sig.time.Timer.start(); + // run incoming until received n_packets - // NOTE: send more packets than we need because UDP drops some - for (1..(n_packets * 2 + 1)) |i| { - prng.fill(&packet_buf); - _ = try socket.sendTo(to_endpoint, &packet_buf); - - // 10Kb per second - // each packet is 1k bytes - // = 10 packets per second - if (i % 10 == 0) { - const elapsed = timer.read(); - if (elapsed.asNanos() < std.time.ns_per_s) { - std.time.sleep(std.time.ns_per_s - elapsed.asNanos()); - } + var packets_to_recv = n_packets; + var timer = try sig.time.Timer.start(); + while (packets_to_recv > 0) { + incoming_channel.waitToReceive(exit_condition) catch break; + while (incoming_channel.tryReceive()) |_| { + packets_to_recv -|= 1; } } - recv_handle.join(); + exit_condition.setExit(); // kill benchSender and join it on defer. return timer.read(); } }; -pub fn benchmarkChannelRecv( - channel: *Channel(Packet), - n_values_to_receive: usize, -) !void { - var count: usize = 0; - while (count < n_values_to_receive) { - if (channel.tryReceive()) |i| { - std.mem.doNotOptimizeAway(i); - count += 1; - } - } -} - test "benchmark packet processing" { _ = try BenchmarkPacketProcessing.benchmarkReadSocket(.{ .n_packets = 100_000, diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index 65243effb..7b0573409 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -30,6 +30,7 @@ const RwMux = sig.sync.RwMux; const SignedGossipData = sig.gossip.SignedGossipData; const SocketAddr = sig.net.SocketAddr; const SocketThread = sig.net.SocketThread; +const Channel = sig.sync.Channel; const Slot = sig.core.Slot; const RepairRequest = shred_network.repair_message.RepairRequest; @@ -251,7 +252,8 @@ pub const RepairRequester = struct { logger: ScopedLogger(@typeName(Self)), random: Random, keypair: *const KeyPair, - sender: SocketThread, + sender_thread: *SocketThread, + sender_channel: *Channel(Packet), metrics: Metrics, const Self = @This(); @@ -272,19 +274,31 @@ pub const RepairRequester = struct { udp_send_socket: Socket, exit: *Atomic(bool), ) !Self { - const sndr = try SocketThread.initSender(allocator, logger, udp_send_socket, exit); + const channel = try Channel(Packet).create(allocator); + errdefer channel.destroy(); + + const thread = try SocketThread.spawnSender( + allocator, + logger, + udp_send_socket, + channel, + .{ .unordered = exit }, + ); + return .{ .allocator = allocator, .logger = logger.withScope(@typeName(Self)), .random = random, .keypair = keypair, - .sender = sndr, + .sender_thread = thread, + .sender_channel = channel, .metrics = try registry.initStruct(Metrics), }; } pub fn deinit(self: Self) void { - self.sender.deinit(self.allocator); + self.sender_thread.join(); + self.sender_channel.destroy(); } pub fn sendRepairRequestBatch( @@ -309,7 +323,7 @@ pub const RepairRequester = struct { self.random.int(Nonce), ); packet.size = data.len; - try self.sender.channel.send(packet); + try self.sender_channel.send(packet); self.metrics.pending_requests.dec(); self.metrics.sent_request_count.inc(); } diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index 125f4547a..33400f683 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -84,12 +84,12 @@ pub fn start( const repair_socket = try bindUdpReusable(conf.repair_port); const turbine_socket = try bindUdpReusable(conf.turbine_recv_port); - // channels + // channels (cant use arena as they need to alloc/free frequently & potentially from multiple sender threads) const unverified_shred_channel = try Channel(Packet).create(deps.allocator); try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel}); const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator); try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel}); - const retransmit_channel = try Channel(sig.net.Packet).create(deps.allocator); + const retransmit_channel = try Channel(Packet).create(deps.allocator); try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); // receiver (threads) diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig index 9b018c0e4..f41cb41c9 100644 --- a/src/shred_network/shred_processor.zig +++ b/src/shred_network/shred_processor.zig @@ -42,9 +42,9 @@ pub fn runShredProcessor( var error_context: ErrorContext = .{}; const metrics = try registry.initStruct(Metrics); - while (!exit.load(.acquire) or - verified_shred_receiver.len() != 0) - { + while (true) { + verified_shred_receiver.waitToReceive(.{ .unordered = exit }) catch break; + shreds.clearRetainingCapacity(); is_repaired.clearRetainingCapacity(); while (verified_shred_receiver.tryReceive()) |packet| { diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 2e716980b..504eace2f 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -21,10 +21,9 @@ const Pong = sig.gossip.Pong; const RepairMessage = shred_network.repair_message.RepairMessage; const Slot = sig.core.Slot; const SocketThread = sig.net.SocketThread; +const ExitCondition = sig.sync.ExitCondition; const VariantCounter = sig.prometheus.VariantCounter; -const NUM_TVU_RECEIVERS = 2; - /// Analogous to [ShredFetchStage](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/shred_fetch_stage.rs#L34) pub const ShredReceiver = struct { allocator: Allocator, @@ -47,61 +46,73 @@ pub const ShredReceiver = struct { defer self.logger.err().log("exiting shred receiver"); errdefer self.logger.err().log("error in shred receiver"); - var response_sender = try SocketThread - .initSender(self.allocator, self.logger.unscoped(), self.repair_socket, self.exit); - defer response_sender.deinit(self.allocator); - var repair_receiver = try SocketThread - .initReceiver(self.allocator, self.logger.unscoped(), self.repair_socket, self.exit); - defer repair_receiver.deinit(self.allocator); - - var turbine_receivers: [NUM_TVU_RECEIVERS]SocketThread = undefined; - for (0..NUM_TVU_RECEIVERS) |i| { - turbine_receivers[i] = try SocketThread.initReceiver( - self.allocator, - self.logger.unscoped(), - self.turbine_socket, - self.exit, - ); - } - defer for (turbine_receivers) |r| r.deinit(self.allocator); + const exit = ExitCondition{ .unordered = self.exit }; - var turbine_channels: [NUM_TVU_RECEIVERS]*Channel(Packet) = undefined; - for (&turbine_receivers, &turbine_channels) |*receiver, *channel| { - channel.* = receiver.channel; - } + // Cretae pipe from response_sender -> repair_socket + const response_sender = try Channel(Packet).create(self.allocator); + defer response_sender.destroy(); - const turbine_thread = try std.Thread.spawn( - .{}, - Self.runPacketHandler, - .{ self, &turbine_channels, response_sender.channel, false }, - ); - const receiver_thread = try std.Thread.spawn( - .{}, - Self.runPacketHandler, - .{ self, &.{repair_receiver.channel}, response_sender.channel, true }, + const response_sender_thread = try SocketThread.spawnSender( + self.allocator, + self.logger.unscoped(), + self.repair_socket, + response_sender, + exit, ); - turbine_thread.join(); - receiver_thread.join(); + defer response_sender_thread.join(); + + // Run a packetHandler thread which pipes from repair_socket -> handlePacket. + const response_thread = try std.Thread.spawn(.{}, runPacketHandler, .{ + self, + response_sender, + self.repair_socket, + exit, + true, // is_repair + }); + defer response_thread.join(); + + // Run a packetHandler thread which pipes from turbine_socket -> handlePacket. + const turbine_thread = try std.Thread.spawn(.{}, runPacketHandler, .{ + self, + response_sender, + self.turbine_socket, + exit, + false, // is_repair + }); + defer turbine_thread.join(); } - /// Keep looping over packet channel and process the incoming packets. - /// Returns when exit is set to true. fn runPacketHandler( self: *Self, - receivers: []const *Channel(Packet), response_sender: *Channel(Packet), + receiver_socket: Socket, + exit: ExitCondition, comptime is_repair: bool, ) !void { - while (!self.exit.load(.acquire)) { - for (receivers) |receiver| { - var packet_count: usize = 0; - while (receiver.tryReceive()) |packet| { - self.metrics.incReceived(is_repair); - packet_count += 1; - try self.handlePacket(packet, response_sender, is_repair); - } - self.metrics.observeBatchSize(is_repair, packet_count); + // Setup a channel. + const receiver = try Channel(Packet).create(self.allocator); + defer receiver.destroy(); + + // Receive from the socket into the channel. + const receiver_thread = try SocketThread.spawnReceiver( + self.allocator, + self.logger.unscoped(), + receiver_socket, + receiver, + exit, + ); + defer receiver_thread.join(); + + // Handle packets from the channel. + while (true) { + receiver.waitToReceive(exit) catch break; + var packet_count: usize = 0; + while (receiver.tryReceive()) |packet| { + self.metrics.incReceived(is_repair); + packet_count += 1; + try self.handlePacket(packet, response_sender, is_repair); } + self.metrics.observeBatchSize(is_repair, packet_count); } } diff --git a/src/shred_network/shred_retransmitter.zig b/src/shred_network/shred_retransmitter.zig index d0c278953..214cd3b0d 100644 --- a/src/shred_network/shred_retransmitter.zig +++ b/src/shred_network/shred_retransmitter.zig @@ -113,16 +113,14 @@ pub fn runShredRetransmitter(params: struct { )); } - try thread_handles.append(try std.Thread.spawn( - .{}, - socket_utils.sendSocket, - .{ - retransmit_socket, - &retransmit_to_socket_channel, - params.logger, - .{ .unordered = params.exit }, - }, - )); + const sender_thread = try socket_utils.SocketThread.spawnSender( + params.allocator, + params.logger, + retransmit_socket, + &retransmit_to_socket_channel, + .{ .unordered = params.exit }, + ); + defer sender_thread.join(); for (thread_handles.items) |thread| thread.join(); } @@ -153,9 +151,11 @@ fn receiveShreds( defer deduper.deinit(); var shreds = std.ArrayList(Packet).init(allocator); + var receive_shreds_timer = try sig.time.Timer.start(); - while (!exit.load(.acquire)) { - var receive_shreds_timer = try sig.time.Timer.start(); + while (true) { + receiver.waitToReceive(.{ .unordered = exit }) catch break; + receive_shreds_timer.reset(); const receiver_len = receiver.len(); if (receiver_len == 0) continue; @@ -323,6 +323,8 @@ fn retransmitShreds( while (!exit.load(.acquire)) { var retransmit_shred_timer = try sig.time.Timer.start(); + // NOTE: multiple `retransmitShreds` run concurrently so we can't use + // `receiver.waitToReceive()` here as it only supports one caller thread. const retransmit_info: RetransmitShredInfo = receiver.tryReceive() orelse continue; defer retransmit_info.turbine_tree.releaseUnsafe(); diff --git a/src/shred_network/shred_verifier.zig b/src/shred_network/shred_verifier.zig index 1c7332df1..b69d96f90 100644 --- a/src/shred_network/shred_verifier.zig +++ b/src/shred_network/shred_verifier.zig @@ -29,9 +29,9 @@ pub fn runShredVerifier( ) !void { const metrics = try registry.initStruct(Metrics); var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024); - while (!exit.load(.acquire) or - unverified_shred_receiver.len() != 0) - { + while (true) { + unverified_shred_receiver.waitToReceive(.{ .unordered = exit }) catch break; + var packet_count: usize = 0; while (unverified_shred_receiver.tryReceive()) |packet| { packet_count += 1; diff --git a/src/sync/channel.zig b/src/sync/channel.zig index 60bf88f86..632f0e747 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -5,14 +5,23 @@ const Backoff = @import("backoff.zig").Backoff; const Atomic = std.atomic.Value; const Allocator = std.mem.Allocator; +const ExitCondition = sig.sync.ExitCondition; + pub fn Channel(T: type) type { return struct { head: Position, tail: Position, closed: Atomic(bool) = Atomic(bool).init(false), allocator: Allocator, - mutex: std.Thread.Mutex = .{}, - condition: std.Thread.Condition = .{}, + event: std.Thread.ResetEvent = .{}, + send_hook: ?*SendHook = null, + + pub const SendHook = struct { + /// Called after the channel has pushed the value. + after_send: *const fn (*SendHook, *Self) void = defaultAfterSend, + + fn defaultAfterSend(_: *SendHook, _: *Self) void {} + }; const Self = @This(); const BLOCK_CAP = 31; @@ -134,13 +143,15 @@ pub fn Channel(T: type) type { pub fn close(channel: *Self) void { channel.closed.store(true, .monotonic); - channel.condition.broadcast(); } pub fn send(channel: *Self, value: T) !void { if (channel.closed.load(.monotonic)) { return error.ChannelClosed; } + + const send_hook = channel.send_hook; + var backoff: Backoff = .{}; var tail = channel.tail.index.load(.acquire); var block = channel.tail.block.load(.acquire); @@ -195,58 +206,24 @@ pub fn Channel(T: type) type { // to read the data we've just assigned. _ = slot.state.fetchOr(WRITTEN_TO, .release); - channel.mutex.lock(); - channel.condition.signal(); - channel.mutex.unlock(); - return; - } - } - } + channel.event.set(); - /// Attempt to receive an item. If the channel is empty, waits until - /// an item is available or the channel is closed. - /// - /// Returns: - /// - T: when an item is available. - /// - error.ChannelClosed: if the channel is both empty and closed. - pub fn receive(channel: *Self) error{ChannelClosed}!T { - while (true) { - if (channel.tryReceive()) |item| { - return item; - } else if (channel.closed.load(.monotonic)) { - return error.ChannelClosed; + if (send_hook) |hook| { + hook.after_send(hook, channel); + } + + return; } - channel.mutex.lock(); - channel.condition.wait(&channel.mutex); - channel.mutex.unlock(); } } - /// Attempt to receive an item. If the channel is empty, waits until - /// the timeout, an item is available, or the channel is closed. - /// - /// Returns: - /// - T: if item was available before the timeout. - /// - null: if empty and timed out before an item arrived. - /// - error.ChannelClosed: if the channel is both empty and closed. - pub fn receiveTimeout(channel: *Self, timeout: sig.time.Duration) error{ChannelClosed}!?T { - const end = std.time.nanoTimestamp() + timeout.asNanos(); - while (true) { - if (channel.tryReceive()) |item| { - return item; - } else if (channel.closed.load(.monotonic)) { - return error.ChannelClosed; - } - const now = std.time.nanoTimestamp(); - if (now > end) { - return null; - } - channel.mutex.lock(); - defer channel.mutex.unlock(); - channel.condition.timedWait(&channel.mutex, @intCast(end - now)) catch |e| - switch (e) { - error.Timeout => return null, - }; + /// Waits untli the channel potentially has items, periodically checking for the ExitCondition. + /// Must be called by only one receiver thread at a time. + pub fn waitToReceive(channel: *Self, exit: ExitCondition) error{Exit}!void { + while (channel.isEmpty()) { + channel.event.timedWait(1 * std.time.ns_per_s) catch {}; + if (exit.shouldExit()) return error.Exit; + if (channel.event.isSet()) return channel.event.reset(); } } @@ -473,53 +450,52 @@ test "spsc" { producer.join(); } -test "blocking receive" { - const S = struct { - fn producer(ch: *Channel(u64)) !void { - try ch.send(123); - } +test "send-hook" { + const Counter = struct { + count: usize = 0, + hook: Channel(u64).SendHook = .{ .after_send = afterSend }, - fn consumer(ch: *Channel(u64)) void { - std.debug.assert(123 == ch.receive() catch @panic("error receiving")); + fn afterSend(hook: *Channel(u64).SendHook, channel: *Channel(u64)) void { + const self: *@This() = @alignCast(@fieldParentPtr("hook", hook)); + self.count += 1; + std.debug.assert(channel.len() == self.count); } }; - var ch = try Channel(u64).init(std.testing.allocator); - defer ch.deinit(); + const Consumer = struct { + collected: std.ArrayList(u64), + hook: Channel(u64).SendHook = .{ .after_send = afterSend }, - const consumer = try std.Thread.spawn(.{}, S.consumer, .{&ch}); - const producer = try std.Thread.spawn(.{}, S.producer, .{&ch}); - - consumer.join(); - producer.join(); -} - -test "timeout receive receives" { - const S = struct { - fn producer(ch: *Channel(u64)) !void { - try ch.send(123); - } - - fn consumer(ch: *Channel(u64)) void { - std.debug.assert(123 == ch.receiveTimeout(sig.time.Duration.fromSecs(1)) catch - @panic("error receiving")); + fn afterSend(hook: *Channel(u64).SendHook, channel: *Channel(u64)) void { + const self: *@This() = @alignCast(@fieldParentPtr("hook", hook)); + const value = channel.tryReceive() orelse @panic("empty channel after send"); + self.collected.append(value) catch @panic("oom"); } }; - var ch = try Channel(u64).init(std.testing.allocator); + const to_send = 100; + const allocator = std.testing.allocator; + + var ch = try Channel(u64).init(allocator); defer ch.deinit(); - const consumer = try std.Thread.spawn(.{}, S.consumer, .{&ch}); - const producer = try std.Thread.spawn(.{}, S.producer, .{&ch}); + // Check that afterSend counts sent channel items. + var counter = Counter{}; + ch.send_hook = &counter.hook; - consumer.join(); - producer.join(); -} + for (0..to_send) |i| try ch.send(i); + try expect(ch.len() == to_send); + try expect(counter.count == to_send); -test "timeout receive times out" { - var ch = try Channel(u64).init(std.testing.allocator); - defer ch.deinit(); - try std.testing.expectEqual(null, try ch.receiveTimeout(sig.time.Duration.fromMillis(10))); + // Check that afterSend consumes any sent values. + var consumer = Consumer{ .collected = std.ArrayList(u64).init(allocator) }; + ch.send_hook = &consumer.hook; + defer consumer.collected.deinit(); + + while (ch.tryReceive()) |_| {} // drain before starting. + for (0..to_send) |i| try ch.send(i); + try expect(ch.isEmpty()); + try expect(consumer.collected.items.len == to_send); } test "mpmc" { diff --git a/src/trace/log.zig b/src/trace/log.zig index bafa421da..659c2ac62 100644 --- a/src/trace/log.zig +++ b/src/trace/log.zig @@ -120,7 +120,7 @@ pub const ChannelPrintLogger = struct { log_allocator: Allocator, log_allocator_state: *RecycleFBA(.{}), max_buffer: u64, - channel: *Channel([]const u8), + channel: Channel([]const u8), handle: ?std.Thread, const Self = @This(); @@ -132,6 +132,7 @@ pub const ChannelPrintLogger = struct { .records_allocator = config.allocator, .bytes_allocator = config.allocator, }, max_buffer); + const self = try config.allocator.create(Self); self.* = .{ .allocator = config.allocator, @@ -141,8 +142,9 @@ pub const ChannelPrintLogger = struct { .exit = AtomicBool.init(false), .max_level = config.max_level, .handle = null, - .channel = try Channel([]const u8).create(config.allocator), + .channel = try Channel([]const u8).init(config.allocator), }; + self.handle = try std.Thread.spawn(.{}, run, .{self}); return self; } @@ -153,9 +155,9 @@ pub const ChannelPrintLogger = struct { self.exit.store(true, .seq_cst); handle.join(); } + self.channel.deinit(); self.log_allocator_state.deinit(); - self.allocator.destroy(self.channel); self.allocator.destroy(self.log_allocator_state); self.allocator.destroy(self); } @@ -169,7 +171,9 @@ pub const ChannelPrintLogger = struct { } pub fn run(self: *Self) void { - while (!self.exit.load(.acquire)) { + while (true) { + self.channel.waitToReceive(.{ .unordered = &self.exit }) catch break; + while (self.channel.tryReceive()) |message| { defer self.log_allocator.free(message); const writer = std.io.getStdErr().writer(); diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index d05fc748d..701d26e48 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -61,6 +61,9 @@ pub const Service = struct { epoch_schedule: EpochSchedule, exit: *AtomicBool, ) !Service { + const send_channel = try Channel(Packet).create(allocator); + errdefer send_channel.destroy(); + return .{ .allocator = allocator, .config = config, @@ -76,7 +79,7 @@ pub const Service = struct { gossip_table_rw, epoch_schedule, )), - .send_channel = try Channel(Packet).create(allocator), + .send_channel = send_channel, .input_channel = input_channel, .logger = logger.withScope(@typeName(Self)), .exit = exit, @@ -120,9 +123,9 @@ pub const Service = struct { var transaction_batch = std.AutoArrayHashMap(Signature, TransactionInfo).init(self.allocator); defer transaction_batch.deinit(); - while (!self.exit.load(.monotonic) or - self.input_channel.len() != 0) - { + while (true) { + self.input_channel.waitToReceive(.{ .unordered = self.exit }) catch break; + while (self.input_channel.tryReceive()) |transaction| { self.metrics.received_count.inc();