Skip to content

Commit

Permalink
style guide + turbine recv pipe cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kprotty committed Jan 10, 2025
1 parent 79c51b1 commit 5ebff6f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
29 changes: 21 additions & 8 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const std = @import("std");
const builtin = @import("builtin");
const Allocator = std.mem.Allocator;
const Atomic = std.atomic.Value;

const sig = @import("../sig.zig");
const Packet = sig.net.Packet;
Expand Down Expand Up @@ -46,10 +45,18 @@ pub const SocketPipe = struct {
.sender => {
self.outgoing_signal = .{};
channel.send_hook = &self.outgoing_signal.hook;
self.handle = try std.Thread.spawn(.{}, runSender, .{ self, logger, socket, channel, exit });
self.handle = try std.Thread.spawn(
.{},
runSender,
.{ self, logger, socket, channel, exit },
);
},
.receiver => {
self.handle = try std.Thread.spawn(.{}, runReceiver, .{ logger, socket, channel, exit });
self.handle = try std.Thread.spawn(
.{},
runReceiver,
.{ logger, socket, channel, exit },
);
},
}

Expand Down Expand Up @@ -160,13 +167,14 @@ pub const BenchmarkPacketProcessing = struct {
var incoming_signal: Channel(Packet).SendSignal = .{};
incoming_channel.send_hook = &incoming_signal.hook;

const incoming_pipe = try SocketPipe.init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition);
const incoming_pipe = try SocketPipe
.init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition);
defer incoming_pipe.deinit(allocator);

// Start outgoing

const S = struct {
fn runSender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void {
fn sender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void {
var i: usize = 0;
var packet: Packet = undefined;
var prng = std.rand.DefaultPrng.init(0);
Expand All @@ -175,7 +183,7 @@ pub const BenchmarkPacketProcessing = struct {
while (e.shouldRun()) {
prng.fill(&packet.data);
packet.addr = addr;
packet.size = packet.data.len;
packet.size = PACKET_DATA_SIZE;
try channel.send(packet);

// 10Kb per second, until one second
Expand All @@ -195,10 +203,15 @@ pub const BenchmarkPacketProcessing = struct {
var outgoing_channel = try Channel(Packet).init(allocator);
defer outgoing_channel.deinit();

const outgoing_pipe = try SocketPipe.init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition);
const outgoing_pipe = try SocketPipe
.init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition);
defer outgoing_pipe.deinit(allocator);

const outgoing_handle = try std.Thread.spawn(.{}, S.runSender, .{ &outgoing_channel, to_endpoint, exit_condition });
const outgoing_handle = try std.Thread.spawn(
.{},
S.sender,
.{ &outgoing_channel, to_endpoint, exit_condition },
);
defer outgoing_handle.join();

// run incoming until received n_packets
Expand Down
35 changes: 29 additions & 6 deletions src/shred_network/shred_receiver.zig
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,50 @@ pub const ShredReceiver = struct {
// Cretae pipe from response_sender (SocketPipe overrides .send_hook) -> repair_socket
const response_sender = try Channel(Packet).create(self.allocator);
defer response_sender.destroy();
const response_sender_pipe = try SocketPipe.init(self.allocator, .sender, self.logger.unscoped(), self.repair_socket, response_sender, exit);

const response_sender_pipe = try SocketPipe.init(
self.allocator,
.sender,
self.logger.unscoped(),
self.repair_socket,
response_sender,
exit,
);
defer response_sender_pipe.deinit(self.allocator);

// Create pipe from repair_socket -> response_receiver (SendSignal overrides .send_hook)
const response_receiver = try Channel(Packet).create(self.allocator);
response_receiver.send_hook = &receive_signal.hook;
defer response_receiver.destroy();

const response_receiver_pipe = try SocketPipe.init(self.allocator, .receiver, self.logger.unscoped(), self.repair_socket, response_receiver, exit);
const response_receiver_pipe = try SocketPipe.init(
self.allocator,
.receiver,
self.logger.unscoped(),
self.repair_socket,
response_receiver,
exit,
);
defer response_receiver_pipe.deinit(self.allocator);

// Create N pipes from turbine_socket -> turbine_receiver (SendSignal overrides .send_hook)
const turbine_receiver = try Channel(Packet).create(self.allocator);
turbine_receiver.send_hook = &receive_signal.hook;
defer turbine_receiver.destroy();

var turbine_receiver_pipes: [NUM_TVU_RECEIVERS]*SocketPipe = undefined;
for (&turbine_receiver_pipes) |*pipe| {
pipe.* = try SocketPipe.init(self.allocator, .receiver, self.logger.unscoped(), self.turbine_socket, turbine_receiver, .{ .unordered = self.exit });
var turbine_receiver_pipes: std.BoundedArray(*SocketPipe, NUM_TVU_RECEIVERS) = .{};
defer for (turbine_receiver_pipes.slice()) |pipe| pipe.deinit(self.allocator);

for (0..NUM_TVU_RECEIVERS) |_| {
try turbine_receiver_pipes.append(try SocketPipe.init(
self.allocator,
.receiver,
self.logger.unscoped(),
self.turbine_socket,
turbine_receiver,
.{ .unordered = self.exit },
));
}
defer for (turbine_receiver_pipes) |pipe| pipe.deinit(self.allocator);

// Run thread to handle incoming packets. Stops when exit is set.
while (true) {
Expand Down

0 comments on commit 5ebff6f

Please sign in to comment.