Skip to content

Commit

Permalink
Improve & simplify socket_utils
Browse files Browse the repository at this point in the history
* Rename `SOCKET_TIMEOUT` to `SOCKET_TIMEOUT_US`, to signify its
  magnitude is in microseconds.

* Remove `recvMmsg` and simplify the timeout logic to simply setting
  it directly before the loop, and finishing the batch on the first
  `error.WouldBlock` when the batch isn't empty.

* Use `packet_batch` as an actual arraylist instead of as a weird slice.
  • Loading branch information
InKryption committed May 31, 2024
1 parent 3b7b21b commit 0fdd396
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 69 deletions.
4 changes: 2 additions & 2 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const requestIpEcho = @import("../net/echo.zig").requestIpEcho;
const servePrometheus = @import("../prometheus/http.zig").servePrometheus;
const parallelUnpackZstdTarBall = @import("../accountsdb/snapshots.zig").parallelUnpackZstdTarBall;
const downloadSnapshotsFromGossip = @import("../accountsdb/download.zig").downloadSnapshotsFromGossip;
const SOCKET_TIMEOUT = @import("../net/socket_utils.zig").SOCKET_TIMEOUT;
const SOCKET_TIMEOUT_US = @import("../net/socket_utils.zig").SOCKET_TIMEOUT_US;

const config = @import("config.zig");
// var validator_config = config.current;
Expand Down Expand Up @@ -411,7 +411,7 @@ fn validator() !void {
// repair
var repair_socket = try Socket.create(network.AddressFamily.ipv4, network.Protocol.udp);
try repair_socket.bindToPort(repair_port);
try repair_socket.setReadTimeout(SOCKET_TIMEOUT);
try repair_socket.setReadTimeout(SOCKET_TIMEOUT_US);

var repair_svc = try initRepair(
logger,
Expand Down
2 changes: 1 addition & 1 deletion src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub const GossipService = struct {
const gossip_address = my_contact_info.getSocket(socket_tag.GOSSIP) orelse return error.GossipAddrUnspecified;
var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed;
gossip_socket.bindToPort(gossip_address.port()) catch return error.SocketBindFailed;
gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT) catch return error.SocketSetTimeoutFailed; // 1 second
gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT_US) catch return error.SocketSetTimeoutFailed; // 1 second

const failed_pull_hashes = HashTimeQueue.init(allocator);
const push_msg_q = ArrayList(SignedGossipData).init(allocator);
Expand Down
87 changes: 21 additions & 66 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const Channel = @import("../sync/channel.zig").Channel;
const std = @import("std");
const Logger = @import("../trace/log.zig").Logger;

pub const SOCKET_TIMEOUT: usize = 1000000;
pub const SOCKET_TIMEOUT_US: usize = 1 * std.time.us_per_s;
pub const PACKETS_PER_BATCH: usize = 64;

pub fn readSocket(
Expand All @@ -17,96 +17,51 @@ pub fn readSocket(
exit: *const std.atomic.Value(bool),
logger: Logger,
) !void {
//Performance out of the IO without poll
// Performance out of the IO without poll
// * block on the socket until it's readable
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning

const MAX_WAIT_NS = std.time.ns_per_ms; // 1ms
try socket.setReadTimeout(SOCKET_TIMEOUT_US);

while (!exit.load(.unordered)) {
// init a new batch
var count: usize = 0;
const capacity = PACKETS_PER_BATCH;
// var count: usize = 0;
// const capacity = PACKETS_PER_BATCH;
var packet_batch = try std.ArrayList(Packet).initCapacity(
allocator,
capacity,
PACKETS_PER_BATCH,
);
packet_batch.appendNTimesAssumeCapacity(Packet.default(), capacity);
errdefer packet_batch.deinit();

// NOTE: usually this would be null (ie, blocking)
// but in order to exit cleanly in tests - we set to 1 second
try socket.setReadTimeout(std.time.ms_per_s);
var timer = std.time.Timer.start() catch unreachable;

// recv packets into batch
while (true) {
const n_packets_read = recvMmsg(socket, packet_batch.items[count..capacity], exit) catch |err| {
if (count > 0 and err == error.WouldBlock) {
if (timer.read() > MAX_WAIT_NS) {
break;
}
}
continue;
while (packet_batch.items.len != packet_batch.capacity) {
var packet: Packet = Packet.default();
const recv_meta = socket.receiveFrom(&packet.data) catch |err| switch (err) {
error.WouldBlock => {
if (packet_batch.items.len > 0) break;
continue;
},
else => |e| return e,
};

if (count == 0) {
// set to nonblocking mode
try socket.setReadTimeout(SOCKET_TIMEOUT);
}
count += n_packets_read;
if (timer.read() > MAX_WAIT_NS or count >= capacity) {
break;
}
const bytes_read = recv_meta.numberOfBytes;
if (bytes_read == 0) return error.SocketClosed;
packet.addr = recv_meta.sender;
packet.size = bytes_read;
packet_batch.appendAssumeCapacity(packet);
}

if (count < capacity) {
packet_batch.shrinkAndFree(count);
}
packet_batch.shrinkAndFree(packet_batch.items.len);
try incoming_channel.send(packet_batch);
}
logger.debugf("readSocket loop closed", .{});
}

pub fn recvMmsg(
socket: *UdpSocket,
/// pre-allocated array of packets to fill up
packet_batch: []Packet,
exit: *const std.atomic.Value(bool),
) !usize {
const max_size = packet_batch.len;
var count: usize = 0;

while (count < max_size) {
var packet = &packet_batch[count];
const recv_meta = socket.receiveFrom(&packet.data) catch |err| {
// would block then return
if (count > 0 and err == error.WouldBlock) {
break;
} else {
if (exit.load(.unordered)) return 0;
continue;
}
};

const bytes_read = recv_meta.numberOfBytes;
if (bytes_read == 0) {
return error.SocketClosed;
}
packet.addr = recv_meta.sender;
packet.size = bytes_read;

if (count == 0) {
// nonblocking mode
try socket.setReadTimeout(SOCKET_TIMEOUT);
}
count += 1;
}

return count;
}

pub fn sendSocket(
socket: *UdpSocket,
outgoing_channel: *Channel(std.ArrayList(Packet)),
Expand Down

0 comments on commit 0fdd396

Please sign in to comment.