From e8cbcf7760b496865a1214465b0700abea2a2ba0 Mon Sep 17 00:00:00 2001 From: Sobeston <15335529+Sobeston@users.noreply.github.com> Date: Thu, 16 Jan 2025 02:50:32 +0000 Subject: [PATCH] use threadlocal io_uring --- src/accountsdb/buffer_pool.zig | 44 +++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/accountsdb/buffer_pool.zig b/src/accountsdb/buffer_pool.zig index 77fb2e6de..1bd7b1644 100644 --- a/src/accountsdb/buffer_pool.zig +++ b/src/accountsdb/buffer_pool.zig @@ -28,6 +28,22 @@ const linux_io_mode: LinuxIoMode = .IoUring; const use_io_uring = builtin.os.tag == .linux and linux_io_mode == .IoUring; +threadlocal var maybe_io_uring: if (use_io_uring) ?std.os.linux.IoUring else void = null; +fn io_uring() !*std.os.linux.IoUring { + const io_uring_entries = 4096; + + return if (maybe_io_uring) |*io_ur| + io_ur + else io_ur: { + maybe_io_uring = try std.os.linux.IoUring.init( + io_uring_entries, + 0, + ); + // TODO: to deinit reliably we could hook thread exit? Not sure if we even need to. + break :io_ur &(maybe_io_uring.?); + }; +} + const FileIdFileOffset = packed struct(u64) { const INVALID: FileIdFileOffset = .{ .file_id = FileId.fromInt(std.math.maxInt(FileId.Int)), @@ -53,12 +69,13 @@ fn readError() type { }; if (use_io_uring) { - const called_fns = &.{ + const extra_fns = &.{ std.os.linux.IoUring.read, std.os.linux.IoUring.submit_and_wait, std.os.linux.IoUring.copy_cqes, + std.os.linux.IoUring.init, }; - inline for (called_fns) |func| { + inline for (extra_fns) |func| { const ErrorSet = @typeInfo( @typeInfo(@TypeOf(func)).Fn.return_type.?, ).ErrorUnion.error_set; @@ -105,9 +122,6 @@ pub const BufferPool = struct { /// used for eviction to free less popular (rc=0) frames first eviction_lfu: HierarchicalFIFO, - /// NOTE: we might want this to be a threadlocal for best performance? I don't think this field is threadsafe - io_uring: if (use_io_uring) std.os.linux.IoUring else void, - pub fn init( init_allocator: std.mem.Allocator, num_frames: u32, @@ -124,18 +138,6 @@ pub const BufferPool = struct { errdefer free_list.deinit(init_allocator); for (0..num_frames) |i| free_list.appendAssumeCapacity(@intCast(i)); - var io_uring = if (use_io_uring) blk: { - // NOTE: this is pretty much a guess, maybe worth tweaking? - // think this is a bit on the high end, libxev uses 256 - const io_uring_entries = 4096; - - break :blk try std.os.linux.IoUring.init( - io_uring_entries, - 0, - ); - } else {}; - errdefer if (use_io_uring) io_uring.deinit(); - var frame_map: FrameMap = .{}; try frame_map.ensureTotalCapacity(init_allocator, num_frames); errdefer frame_map.deinit(init_allocator); @@ -148,14 +150,12 @@ pub const BufferPool = struct { .free_list = free_list, .frame_map_rw = frame_map_rw, .eviction_lfu = try HierarchicalFIFO.init(init_allocator, num_frames / 10, num_frames), - .io_uring = io_uring, }; } pub fn deinit(self: *BufferPool, init_allocator: std.mem.Allocator) void { init_allocator.free(self.frames); self.frames_metadata.deinit(init_allocator); - if (use_io_uring) self.io_uring.deinit(); self.free_list.deinit(init_allocator); self.eviction_lfu.deinit(init_allocator); const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); @@ -389,7 +389,7 @@ pub const BufferPool = struct { } }; - _ = try self.io_uring.read( + _ = try (try io_uring()).read( f_idx.*, file.handle, .{ .buffer = &self.frames[f_idx.*] }, @@ -403,7 +403,7 @@ pub const BufferPool = struct { // (This read length will almost always be FRAME_SIZE, however it will likely be less than // that at the end of the file) if (n_invalid_indices > 0) { - const n_submitted = try self.io_uring.submit_and_wait(n_invalid_indices); + const n_submitted = try (try io_uring()).submit_and_wait(n_invalid_indices); std.debug.assert(n_submitted == n_invalid_indices); // did smthng else submit an event? // would be nice to get rid of this alloc @@ -412,7 +412,7 @@ pub const BufferPool = struct { // check our completions in order to set the frame's size; // we need to wait for completion to get the bytes read - const cqe_count = try self.io_uring.copy_cqes(cqes, n_submitted); + const cqe_count = try (try io_uring()).copy_cqes(cqes, n_submitted); std.debug.assert(cqe_count == n_submitted); // why did we not receive them all? for (0.., cqes) |i, cqe| { if (cqe.err() != .SUCCESS) {