Skip to content

Commit

Permalink
Merge branch 'dnut/fix/shred-network/keepup' into dnut/fix/shred-netw…
Browse files Browse the repository at this point in the history
…ork/leak
  • Loading branch information
dnut committed Jan 22, 2025
2 parents f2a99d8 + c9e0104 commit 7f38b9b
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 337 deletions.
2 changes: 1 addition & 1 deletion src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ pub fn testTransactionSenderService() !void {

// setup channel for communication to the tx-sender service
const transaction_channel = try sig.sync.Channel(sig.transaction_sender.TransactionInfo).create(allocator);
defer transaction_channel.deinit();
defer transaction_channel.destroy();

// this handles transactions and forwards them to leaders TPU ports
var transaction_sender_service = try sig.transaction_sender.Service.init(
Expand Down
4 changes: 3 additions & 1 deletion src/geyser/core.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ pub const GeyserWriter = struct {
}

pub fn IOStreamLoop(self: *Self) !void {
while (!self.exit.load(.acquire)) {
while (true) {
self.io_channel.waitToReceive(.{ .unordered = self.exit }) catch break;

while (self.io_channel.tryReceive()) |payload| {
_ = self.writeToPipe(payload) catch |err| {
if (err == WritePipeError.PipeBlockedWithExitSignaled) {
Expand Down
4 changes: 3 additions & 1 deletion src/geyser/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ pub fn csvDumpIOWriter(
var timer = try sig.time.Timer.start();
errdefer exit.store(true, .monotonic);

while (!exit.load(.monotonic)) {
while (true) {
io_channel.waitToReceive(.{ .unordered = exit }) catch break;

while (io_channel.tryReceive()) |csv_row| {
// write to file
try csv_file.writeAll(csv_row);
Expand Down
63 changes: 38 additions & 25 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr;
const ServiceManager = sig.utils.service_manager.ServiceManager;
const Duration = sig.time.Duration;
const ExitCondition = sig.sync.ExitCondition;
const SocketThread = sig.net.SocketThread;

const endpointToString = sig.net.endpointToString;
const globalRegistry = sig.prometheus.globalRegistry;
Expand Down Expand Up @@ -104,7 +105,7 @@ const GOSSIP_PRNG_SEED = 19;

/// The flow of data goes as follows:
///
/// `readSocket` ->
/// `SocketThread.initReceiver` ->
/// - reads from the gossip socket
/// - puts the new packet onto `packet_incoming_channel`
/// - repeat until exit
Expand All @@ -120,14 +121,14 @@ const GOSSIP_PRNG_SEED = 19;
/// - processes the verified message it has received
/// - depending on the type of message received, it may put something onto `packet_outgoing_channel`
///
/// `sendSocket` ->
/// `SocketThread.initSender` ->
/// - receives from `packet_outgoing_channel`
/// - sends the outgoing packet onto the gossip socket
/// - repeats while `exit` is false and `packet_outgoing_channel`
/// - when `sendSocket` sees that `exit` has become `true`, it will begin waiting on
/// - when `SocketThread` sees that `exit` has become `true`, it will begin waiting on
/// the previous thing in the chain to close, that usually being `processMessages`.
/// this ensures that `processMessages` doesn't add new items to `packet_outgoing_channel`
/// after the `sendSocket` thread exits.
/// after the `SocketThread` exits.
///
pub const GossipService = struct {
/// used for general allocation purposes
Expand All @@ -148,6 +149,11 @@ pub const GossipService = struct {
/// Indicates if the gossip service is closed.
closed: bool,

/// Piping data between the gossip_socket and the channels.
/// Set to null until start() is called as they represent threads.
incoming_socket_thread: ?*SocketThread = null,
outgoing_socket_thread: ?*SocketThread = null,

/// communication between threads
packet_incoming_channel: *Channel(Packet),
packet_outgoing_channel: *Channel(Packet),
Expand Down Expand Up @@ -218,13 +224,13 @@ pub const GossipService = struct {

// setup channels for communication between threads
var packet_incoming_channel = try Channel(Packet).create(allocator);
errdefer packet_incoming_channel.deinit();
errdefer packet_incoming_channel.destroy();

var packet_outgoing_channel = try Channel(Packet).create(allocator);
errdefer packet_outgoing_channel.deinit();
errdefer packet_outgoing_channel.destroy();

var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator);
errdefer verified_incoming_channel.deinit();
errdefer verified_incoming_channel.destroy();

// setup the socket (bind with read-timeout)
const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified;
Expand Down Expand Up @@ -328,19 +334,20 @@ pub const GossipService = struct {
// wait for all threads to shutdown correctly
self.service_manager.deinit();

// Wait for pipes to shutdown if any
if (self.incoming_socket_thread) |thread| thread.join();
if (self.outgoing_socket_thread) |thread| thread.join();

// assert the channels are empty in order to make sure no data was lost.
// everything should be cleaned up when the thread-pool joins.
std.debug.assert(self.packet_incoming_channel.len() == 0);
self.packet_incoming_channel.deinit();
self.allocator.destroy(self.packet_incoming_channel);
std.debug.assert(self.packet_incoming_channel.isEmpty());
self.packet_incoming_channel.destroy();

std.debug.assert(self.packet_outgoing_channel.len() == 0);
self.packet_outgoing_channel.deinit();
self.allocator.destroy(self.packet_outgoing_channel);
std.debug.assert(self.packet_outgoing_channel.isEmpty());
self.packet_outgoing_channel.destroy();

std.debug.assert(self.verified_incoming_channel.len() == 0);
self.verified_incoming_channel.deinit();
self.allocator.destroy(self.verified_incoming_channel);
std.debug.assert(self.verified_incoming_channel.isEmpty());
self.verified_incoming_channel.destroy();

self.gossip_socket.close();

Expand Down Expand Up @@ -387,7 +394,7 @@ pub const GossipService = struct {
pub fn start(
self: *Self,
params: RunThreadsParams,
) (std.mem.Allocator.Error || std.Thread.SpawnError)!void {
) !void {
// NOTE: this is stack copied on each spawn() call below so we can modify it without
// affecting other threads
var exit_condition = sig.sync.ExitCondition{
Expand All @@ -397,12 +404,13 @@ pub const GossipService = struct {
},
};

try self.service_manager.spawn("[gossip] readSocket", socket_utils.readSocket, .{
self.incoming_socket_thread = try SocketThread.spawnReceiver(
self.allocator,
self.logger.unscoped(),
self.gossip_socket,
self.packet_incoming_channel,
self.logger.unscoped(),
exit_condition,
});
);
exit_condition.ordered.exit_index += 1;

try self.service_manager.spawn("[gossip] verifyPackets", verifyPackets, .{
Expand All @@ -427,12 +435,13 @@ pub const GossipService = struct {
exit_condition.ordered.exit_index += 1;
}

try self.service_manager.spawn("[gossip] sendSocket", socket_utils.sendSocket, .{
self.outgoing_socket_thread = try SocketThread.spawnSender(
self.allocator,
self.logger.unscoped(),
self.gossip_socket,
self.packet_outgoing_channel,
self.logger.unscoped(),
exit_condition,
});
);
exit_condition.ordered.exit_index += 1;

if (params.dump) {
Expand Down Expand Up @@ -514,7 +523,9 @@ pub const GossipService = struct {
}

// loop until the previous service closes and triggers us to close
while (exit_condition.shouldRun()) {
while (true) {
self.packet_incoming_channel.waitToReceive(exit_condition) catch break;

// verify in parallel using the threadpool
// PERF: investigate CPU pinning
var task_search_start_idx: usize = 0;
Expand Down Expand Up @@ -609,7 +620,9 @@ pub const GossipService = struct {
// keep waiting for new data until,
// - `exit` isn't set,
// - there isn't any data to process in the input channel, in order to block the join until we've finished
while (exit_condition.shouldRun()) {
while (true) {
self.verified_incoming_channel.waitToReceive(exit_condition) catch break;

var msg_count: usize = 0;
while (self.verified_incoming_channel.tryReceive()) |message| {
msg_count += 1;
Expand Down
Loading

0 comments on commit 7f38b9b

Please sign in to comment.