From 5ebff6f3448129597cf1fe04b8cf3b9e888cbd2b Mon Sep 17 00:00:00 2001 From: kprotty Date: Fri, 10 Jan 2025 01:17:58 -0600 Subject: [PATCH] style guide + turbine recv pipe cleanup --- src/net/socket_utils.zig | 29 ++++++++++++++++------- src/shred_network/shred_receiver.zig | 35 +++++++++++++++++++++++----- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 2807e5a4f..6e5088a3f 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -1,7 +1,6 @@ const std = @import("std"); const builtin = @import("builtin"); const Allocator = std.mem.Allocator; -const Atomic = std.atomic.Value; const sig = @import("../sig.zig"); const Packet = sig.net.Packet; @@ -46,10 +45,18 @@ pub const SocketPipe = struct { .sender => { self.outgoing_signal = .{}; channel.send_hook = &self.outgoing_signal.hook; - self.handle = try std.Thread.spawn(.{}, runSender, .{ self, logger, socket, channel, exit }); + self.handle = try std.Thread.spawn( + .{}, + runSender, + .{ self, logger, socket, channel, exit }, + ); }, .receiver => { - self.handle = try std.Thread.spawn(.{}, runReceiver, .{ logger, socket, channel, exit }); + self.handle = try std.Thread.spawn( + .{}, + runReceiver, + .{ logger, socket, channel, exit }, + ); }, } @@ -160,13 +167,14 @@ pub const BenchmarkPacketProcessing = struct { var incoming_signal: Channel(Packet).SendSignal = .{}; incoming_channel.send_hook = &incoming_signal.hook; - const incoming_pipe = try SocketPipe.init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition); + const incoming_pipe = try SocketPipe + .init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition); defer incoming_pipe.deinit(allocator); // Start outgoing const S = struct { - fn runSender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void { + fn sender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void { var i: usize = 0; var packet: Packet = undefined; var prng = std.rand.DefaultPrng.init(0); @@ -175,7 +183,7 @@ pub const BenchmarkPacketProcessing = struct { while (e.shouldRun()) { prng.fill(&packet.data); packet.addr = addr; - packet.size = packet.data.len; + packet.size = PACKET_DATA_SIZE; try channel.send(packet); // 10Kb per second, until one second @@ -195,10 +203,15 @@ pub const BenchmarkPacketProcessing = struct { var outgoing_channel = try Channel(Packet).init(allocator); defer outgoing_channel.deinit(); - const outgoing_pipe = try SocketPipe.init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition); + const outgoing_pipe = try SocketPipe + .init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition); defer outgoing_pipe.deinit(allocator); - const outgoing_handle = try std.Thread.spawn(.{}, S.runSender, .{ &outgoing_channel, to_endpoint, exit_condition }); + const outgoing_handle = try std.Thread.spawn( + .{}, + S.sender, + .{ &outgoing_channel, to_endpoint, exit_condition }, + ); defer outgoing_handle.join(); // run incoming until received n_packets diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 9cea92128..8d41b8f9c 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -54,7 +54,15 @@ pub const ShredReceiver = struct { // Cretae pipe from response_sender (SocketPipe overrides .send_hook) -> repair_socket const response_sender = try Channel(Packet).create(self.allocator); defer response_sender.destroy(); - const response_sender_pipe = try SocketPipe.init(self.allocator, .sender, self.logger.unscoped(), self.repair_socket, response_sender, exit); + + const response_sender_pipe = try SocketPipe.init( + self.allocator, + .sender, + self.logger.unscoped(), + self.repair_socket, + response_sender, + exit, + ); defer response_sender_pipe.deinit(self.allocator); // Create pipe from repair_socket -> response_receiver (SendSignal overrides .send_hook) @@ -62,7 +70,14 @@ pub const ShredReceiver = struct { response_receiver.send_hook = &receive_signal.hook; defer response_receiver.destroy(); - const response_receiver_pipe = try SocketPipe.init(self.allocator, .receiver, self.logger.unscoped(), self.repair_socket, response_receiver, exit); + const response_receiver_pipe = try SocketPipe.init( + self.allocator, + .receiver, + self.logger.unscoped(), + self.repair_socket, + response_receiver, + exit, + ); defer response_receiver_pipe.deinit(self.allocator); // Create N pipes from turbine_socket -> turbine_receiver (SendSignal overrides .send_hook) @@ -70,11 +85,19 @@ pub const ShredReceiver = struct { turbine_receiver.send_hook = &receive_signal.hook; defer turbine_receiver.destroy(); - var turbine_receiver_pipes: [NUM_TVU_RECEIVERS]*SocketPipe = undefined; - for (&turbine_receiver_pipes) |*pipe| { - pipe.* = try SocketPipe.init(self.allocator, .receiver, self.logger.unscoped(), self.turbine_socket, turbine_receiver, .{ .unordered = self.exit }); + var turbine_receiver_pipes: std.BoundedArray(*SocketPipe, NUM_TVU_RECEIVERS) = .{}; + defer for (turbine_receiver_pipes.slice()) |pipe| pipe.deinit(self.allocator); + + for (0..NUM_TVU_RECEIVERS) |_| { + try turbine_receiver_pipes.append(try SocketPipe.init( + self.allocator, + .receiver, + self.logger.unscoped(), + self.turbine_socket, + turbine_receiver, + .{ .unordered = self.exit }, + )); } - defer for (turbine_receiver_pipes) |pipe| pipe.deinit(self.allocator); // Run thread to handle incoming packets. Stops when exit is set. while (true) {