Skip to content

Commit

Permalink
use threadlocal io_uring
Browse files Browse the repository at this point in the history
  • Loading branch information
Sobeston committed Jan 16, 2025
1 parent 4675677 commit e8cbcf7
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/accountsdb/buffer_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ const linux_io_mode: LinuxIoMode = .IoUring;

const use_io_uring = builtin.os.tag == .linux and linux_io_mode == .IoUring;

threadlocal var maybe_io_uring: if (use_io_uring) ?std.os.linux.IoUring else void = null;
fn io_uring() !*std.os.linux.IoUring {
const io_uring_entries = 4096;

return if (maybe_io_uring) |*io_ur|
io_ur
else io_ur: {
maybe_io_uring = try std.os.linux.IoUring.init(
io_uring_entries,
0,
);
// TODO: to deinit reliably we could hook thread exit? Not sure if we even need to.
break :io_ur &(maybe_io_uring.?);
};
}

const FileIdFileOffset = packed struct(u64) {
const INVALID: FileIdFileOffset = .{
.file_id = FileId.fromInt(std.math.maxInt(FileId.Int)),
Expand All @@ -53,12 +69,13 @@ fn readError() type {
};

if (use_io_uring) {
const called_fns = &.{
const extra_fns = &.{
std.os.linux.IoUring.read,
std.os.linux.IoUring.submit_and_wait,
std.os.linux.IoUring.copy_cqes,
std.os.linux.IoUring.init,
};
inline for (called_fns) |func| {
inline for (extra_fns) |func| {
const ErrorSet = @typeInfo(
@typeInfo(@TypeOf(func)).Fn.return_type.?,
).ErrorUnion.error_set;
Expand Down Expand Up @@ -105,9 +122,6 @@ pub const BufferPool = struct {
/// used for eviction to free less popular (rc=0) frames first
eviction_lfu: HierarchicalFIFO,

/// NOTE: we might want this to be a threadlocal for best performance? I don't think this field is threadsafe
io_uring: if (use_io_uring) std.os.linux.IoUring else void,

pub fn init(
init_allocator: std.mem.Allocator,
num_frames: u32,
Expand All @@ -124,18 +138,6 @@ pub const BufferPool = struct {
errdefer free_list.deinit(init_allocator);
for (0..num_frames) |i| free_list.appendAssumeCapacity(@intCast(i));

var io_uring = if (use_io_uring) blk: {
// NOTE: this is pretty much a guess, maybe worth tweaking?
// think this is a bit on the high end, libxev uses 256
const io_uring_entries = 4096;

break :blk try std.os.linux.IoUring.init(
io_uring_entries,
0,
);
} else {};
errdefer if (use_io_uring) io_uring.deinit();

var frame_map: FrameMap = .{};
try frame_map.ensureTotalCapacity(init_allocator, num_frames);
errdefer frame_map.deinit(init_allocator);
Expand All @@ -148,14 +150,12 @@ pub const BufferPool = struct {
.free_list = free_list,
.frame_map_rw = frame_map_rw,
.eviction_lfu = try HierarchicalFIFO.init(init_allocator, num_frames / 10, num_frames),
.io_uring = io_uring,
};
}

pub fn deinit(self: *BufferPool, init_allocator: std.mem.Allocator) void {
init_allocator.free(self.frames);
self.frames_metadata.deinit(init_allocator);
if (use_io_uring) self.io_uring.deinit();
self.free_list.deinit(init_allocator);
self.eviction_lfu.deinit(init_allocator);
const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock();
Expand Down Expand Up @@ -389,7 +389,7 @@ pub const BufferPool = struct {
}
};

_ = try self.io_uring.read(
_ = try (try io_uring()).read(
f_idx.*,
file.handle,
.{ .buffer = &self.frames[f_idx.*] },
Expand All @@ -403,7 +403,7 @@ pub const BufferPool = struct {
// (This read length will almost always be FRAME_SIZE, however it will likely be less than
// that at the end of the file)
if (n_invalid_indices > 0) {
const n_submitted = try self.io_uring.submit_and_wait(n_invalid_indices);
const n_submitted = try (try io_uring()).submit_and_wait(n_invalid_indices);
std.debug.assert(n_submitted == n_invalid_indices); // did smthng else submit an event?

// would be nice to get rid of this alloc
Expand All @@ -412,7 +412,7 @@ pub const BufferPool = struct {

// check our completions in order to set the frame's size;
// we need to wait for completion to get the bytes read
const cqe_count = try self.io_uring.copy_cqes(cqes, n_submitted);
const cqe_count = try (try io_uring()).copy_cqes(cqes, n_submitted);
std.debug.assert(cqe_count == n_submitted); // why did we not receive them all?
for (0.., cqes) |i, cqe| {
if (cqe.err() != .SUCCESS) {
Expand Down

0 comments on commit e8cbcf7

Please sign in to comment.