Skip to content

Commit

Permalink
wip: fix benchmark + slowdown sender with prints
Browse files Browse the repository at this point in the history
  • Loading branch information
kprotty committed Jan 9, 2025
1 parent 0639ada commit 2a71ca1
Showing 1 changed file with 55 additions and 28 deletions.
83 changes: 55 additions & 28 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ const EventLoop = struct {
const bytes = try result;

const node = maybe_node.?;
// std.debug.print("write finished on {*}\n", .{node});
std.debug.print("write finished on {*}\n", .{node});
//std.time.sleep(10 * std.time.ns_per_ms);

std.debug.assert(node.data.packet.?.size == bytes);
node.data.packet = null;
Expand Down Expand Up @@ -454,44 +455,70 @@ pub const BenchmarkPacketProcessing = struct {

var socket = try UdpSocket.create(.ipv4, .udp);
try socket.bindToPort(0);
try socket.setReadTimeout(std.time.us_per_s); // 1 second

var exit_flag = std.atomic.Value(bool).init(false);
const exit: ExitCondition = .{ .unordered = &exit_flag };
const to_endpoint = try socket.getLocalEndPoint();

const sender = try SocketChannel.initSender(allocator, .noop, socket, exit);
defer sender.deinit(allocator);
var exit_flag = std.atomic.Value(bool).init(false);
const exit_condition = ExitCondition{ .unordered = &exit_flag };

const receiver = try SocketChannel.initReceiver(allocator, .noop, socket, exit);
defer receiver.deinit(allocator);
// Setup incoming

var incoming_channel = try Channel(Packet).init(allocator);
defer incoming_channel.deinit();

const incoming_pipe = try SocketPipe.initReceiver(allocator, .noop, socket, &incoming_channel, exit_condition);
defer incoming_pipe.deinit(allocator);

// Start outgoing

const S = struct {
fn runSender(channel: *Channel(Packet), addr: network.EndPoint, e: ExitCondition) !void {
var i: usize = 0;
var packet: Packet = undefined;
var prng = std.rand.DefaultPrng.init(0);
var timer = try std.time.Timer.start();

while (e.shouldRun()) {
prng.fill(&packet.data);
packet.addr = addr;
packet.size = packet.data.len;
try channel.send(packet);

// 10Kb per second, until one second
// each packet is 1k bytes
// = 10 packets per second
i += 1;
if (i % 10 == 0) {
const elapsed = timer.read();
if (elapsed < std.time.ns_per_s) {
std.time.sleep(std.time.ns_per_s);
}
}
}
}
};

var packets_to_send: usize = n_packets;
var packets_to_recv: usize = n_packets;
var prng = std.rand.DefaultPrng.init(0);
var packet_buf: [PACKET_DATA_SIZE]u8 = undefined;
var timer = try sig.time.Timer.start();
var outgoing_channel = try Channel(Packet).init(allocator);
defer outgoing_channel.deinit();

var push: usize = 0;
while (packets_to_send > 0 or packets_to_recv > 0) {
if (packets_to_send > 0) {
prng.fill(&packet_buf);
try sender.channel.send(Packet.init(to_endpoint, packet_buf, packet_buf.len));
packets_to_send -= 1;
push += 1;
}
const outgoing_pipe = try SocketPipe.initSender(allocator, .noop, socket, &outgoing_channel, exit_condition);
defer outgoing_pipe.deinit(allocator);

// if (push % 10_000 == 0) {
// std.debug.print("push:{} to_send:{} to_recv:{}\n", .{push, packets_to_send, packets_to_recv});
// }
const outgoing_handle = try std.Thread.spawn(.{}, S.runSender, .{&outgoing_channel, to_endpoint, exit_condition});
defer outgoing_handle.join();

while (receiver.channel.tryReceive()) |_| {
packets_to_recv -= 1;
// run incoming until received n_packets

var packets_to_recv = n_packets;
var timer = try sig.time.Timer.start();
while (packets_to_recv > 0) {
while (incoming_channel.tryReceive()) |_| {
packets_to_recv -|= 1;
}

packets_to_send = @max(packets_to_send, packets_to_recv);
}

exit.setExit();
exit_condition.setExit(); // kill benchSender and join it on defer.
return timer.read();
}
};
Expand Down

0 comments on commit 2a71ca1

Please sign in to comment.