From 2b114716fcb749034979c26fae11db87053af23a Mon Sep 17 00:00:00 2001 From: Sobeston <15335529+Sobeston@users.noreply.github.com> Date: Thu, 16 Jan 2025 03:51:20 +0000 Subject: [PATCH] frames metadata: use atomics for all fields --- src/accountsdb/buffer_pool.zig | 98 ++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 46 deletions(-) diff --git a/src/accountsdb/buffer_pool.zig b/src/accountsdb/buffer_pool.zig index 1bd7b1644..13f6dff92 100644 --- a/src/accountsdb/buffer_pool.zig +++ b/src/accountsdb/buffer_pool.zig @@ -244,12 +244,11 @@ pub const BufferPool = struct { size: FrameOffset, ) error{CannotOverwriteAliveInfo}!void { try self.overwriteDeadFrameInfoNoSize(f_idx, file_id, frame_aligned_file_offset); - self.frames_metadata.size[f_idx] = size; + self.frames_metadata.size[f_idx].store(size, .release); } /// Useful if you don't currently know the size. /// make sure to set the size later (!) - /// TODO: atomics fn overwriteDeadFrameInfoNoSize( self: *BufferPool, f_idx: FrameIndex, @@ -264,13 +263,13 @@ pub const BufferPool = struct { } self.frames_metadata.freqSetToZero(f_idx); - self.frames_metadata.in_queue[f_idx] = .none; + self.frames_metadata.in_queue[f_idx].store(.none, .release); self.frames_metadata.rc[f_idx].reset(); - self.frames_metadata.key[f_idx] = .{ + self.frames_metadata.key[f_idx].store(@bitCast(FileIdFileOffset{ .file_id = file_id, .file_offset = frame_aligned_file_offset, - }; + }), .release); const key: FileIdFileOffset = .{ .file_id = file_id, @@ -293,7 +292,7 @@ pub const BufferPool = struct { const did_remove = blk: { const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); defer frame_map_lg.unlock(); - break :blk frame_map.remove(self.frames_metadata.key[evicted]); + break :blk frame_map.remove(@bitCast(self.frames_metadata.key[evicted].load(.acquire))); }; if (!did_remove) { std.debug.panic( @@ -427,8 +426,7 @@ pub const BufferPool = struct { const bytes_read: FrameOffset = @intCast(cqe.res); std.debug.assert(bytes_read <= FRAME_SIZE); - // TODO: atomics - self.frames_metadata.size[f_idx] = bytes_read; + self.frames_metadata.size[f_idx].store(bytes_read, .release); } } @@ -510,27 +508,29 @@ pub const BufferPool = struct { } }; -/// TODO: atomics on all index accesses pub const FramesMetadata = struct { - pub const InQueue = enum(u2) { none, small, main, ghost }; + pub const InQueue = enum(u8) { none, small, main, ghost }; // u8 required for extern usage /// ref count for the frame. For frames that are currently being used elsewhere. rc: []sig.sync.ReferenceCounter, /// effectively the inverse of BufferPool.FrameMap, used in order to /// evict keys by their value - key: []FileIdFileOffset, + /// This is really a FileIdFileOffset. + /// TODO: Zig 0.14 (#20590) seems to let us to do atomics on packed structs directly. + key: []std.atomic.Value(u64), - /// frequency for the S3_FIFO + /// frequency for the HierarchicalFIFO /// Yes, really, only 0, 1, 2, 3. /// Atomic - do not access directly. freq: []u2, - /// which S3_FIFO queue this frame exists in - in_queue: []InQueue, + /// which HierarchicalFIFO queue this frame exists in + in_queue: []std.atomic.Value(InQueue), /// 0..=512 - size: []FrameOffset, + /// This is really a FrameOffset, but I've upped it to a u16 to appease std.atomic + size: []std.atomic.Value(u16), fn init(allocator: std.mem.Allocator, num_frames: usize) !FramesMetadata { const rc = try allocator.alignedAlloc( @@ -541,21 +541,33 @@ pub const FramesMetadata = struct { errdefer allocator.free(rc); @memset(rc, .{ .state = .{ .raw = 0 } }); - const key = try allocator.alignedAlloc(FileIdFileOffset, std.mem.page_size, num_frames); + const key = try allocator.alignedAlloc( + std.atomic.Value(u64), + std.mem.page_size, + num_frames, + ); errdefer allocator.free(key); - @memset(key, .{ .file_id = FileId.fromInt(0), .file_offset = 0 }); + @memset( + key, + std.atomic.Value(u64).init( + @bitCast(FileIdFileOffset{ + .file_id = FileId.fromInt(0), + .file_offset = 0, + }), + ), + ); const freq = try allocator.alignedAlloc(u2, std.mem.page_size, num_frames); errdefer allocator.free(freq); @memset(freq, 0); - const in_queue = try allocator.alignedAlloc(InQueue, std.mem.page_size, num_frames); + const in_queue = try allocator.alignedAlloc(std.atomic.Value(InQueue), std.mem.page_size, num_frames); errdefer allocator.free(in_queue); - @memset(in_queue, .none); + @memset(in_queue, std.atomic.Value(InQueue).init(.none)); - const size = try allocator.alignedAlloc(FrameOffset, std.mem.page_size, num_frames); + const size = try allocator.alignedAlloc(std.atomic.Value(u16), std.mem.page_size, num_frames); errdefer allocator.free(size); - @memset(size, 0); + @memset(size, std.atomic.Value(u16).init(0)); return .{ .rc = rc, @@ -586,16 +598,15 @@ pub const FramesMetadata = struct { // TODO: this should *all* be atomic (!) fn resetFrame(self: FramesMetadata, index: FrameIndex) error{CannotResetAlive}!void { if (self.rc[index].isAlive()) return error.CannotResetAlive; - self.freq[index] = 0; - self.in_queue[index] = .none; - self.size[index] = 0; - self.key[index] = FileIdFileOffset.INVALID; + + self.freqSetToZero(index); + self.in_queue[index].store(.none, .release); + self.size[index].store(0, .release); + self.key[index].store(@bitCast(FileIdFileOffset.INVALID), .release); } fn freqIncrement(self: FramesMetadata, index: FrameIndex) void { - // note for reviewers: I myself do not trust this code, have a think about it - - const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acq_rel); + const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acquire); if (old_freq == 0) { // we overflowed (3->0), set back to max @atomicStore(u2, &self.freq[index], 3, .release); @@ -603,9 +614,7 @@ pub const FramesMetadata = struct { } fn freqDecrement(self: FramesMetadata, index: FrameIndex) void { - // note for reviewers: I myself do not trust this code, have a think about it - - const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acq_rel); + const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acquire); if (old_freq == 3) { // we overflowed (0->3), set back to min @atomicStore(u2, &self.freq[index], 0, .release); @@ -613,18 +622,15 @@ pub const FramesMetadata = struct { } fn freqIsZero(self: FramesMetadata, index: FrameIndex) bool { - // note for reviewers: I myself do not trust this code, have a think about it const freq = @atomicLoad(u2, &self.freq[index], .acquire); return freq == 0; } fn freqSetToOne(self: FramesMetadata, index: FrameIndex) void { - // note for reviewers: I myself do not trust this code, have a think about it @atomicStore(u2, &self.freq[index], 1, .release); } fn freqSetToZero(self: FramesMetadata, index: FrameIndex) void { - // note for reviewers: I myself do not trust this code, have a think about it @atomicStore(u2, &self.freq[index], 0, .release); } }; @@ -693,7 +699,7 @@ pub const HierarchicalFIFO = struct { ) error{InvalidKey}!void { if (key == INVALID_FRAME) return error.InvalidKey; - switch (metadata.in_queue[key]) { + switch (metadata.in_queue[key].load(.acquire)) { .main, .small => { metadata.freqIncrement(key); }, @@ -703,7 +709,7 @@ pub const HierarchicalFIFO = struct { // Add key to main too - important to note that the key *still* // exists within ghost, but from now on we'll ignore that entry. self.main.writeItemAssumeCapacity(key); - metadata.in_queue[key] = .main; + metadata.in_queue[key].store(.main, .release); }, .none => { if (self.small.writableLengthIsZero()) { @@ -711,14 +717,14 @@ pub const HierarchicalFIFO = struct { if (metadata.freqIsZero(popped_small)) { self.ghost.writeItemAssumeCapacity(popped_small); - metadata.in_queue[popped_small] = .ghost; + metadata.in_queue[popped_small].store(.ghost, .release); } else { self.main.writeItemAssumeCapacity(popped_small); - metadata.in_queue[popped_small] = .main; + metadata.in_queue[popped_small].store(.main, .release); } } self.small.writeItemAssumeCapacity(key); - metadata.in_queue[key] = .small; + metadata.in_queue[key].store(.small, .release); }, } } @@ -758,13 +764,13 @@ pub const HierarchicalFIFO = struct { if (metadata.rc[evicted].isAlive()) { metadata.freqSetToOne(evicted); self.main.writeItemAssumeCapacity(evicted); - metadata.in_queue[evicted] = .main; + metadata.in_queue[evicted].store(.main, .release); alive_eviction_attempts += 1; continue; } // key is definitely dead - metadata.in_queue[evicted] = .none; + metadata.in_queue[evicted].store(.none, .release); break evicted; }; @@ -773,7 +779,7 @@ pub const HierarchicalFIFO = struct { fn evictGhost(self: *HierarchicalFIFO, metadata: Metadata) ?Key { const evicted: ?Key = while (self.ghost.readItem()) |ghost_key| { - switch (metadata.in_queue[ghost_key]) { + switch (metadata.in_queue[ghost_key].load(.acquire)) { .ghost => { break ghost_key; }, @@ -806,8 +812,8 @@ pub const HierarchicalFIFO = struct { const evicted: ?Key = while (queue.readItem()) |popped_key| { switch (target_queue) { - .small => if (metadata.in_queue[popped_key] != .small) unreachable, - .main => if (metadata.in_queue[popped_key] != .main) unreachable, + .small => if (metadata.in_queue[popped_key].load(.acquire) != .small) unreachable, + .main => if (metadata.in_queue[popped_key].load(.acquire) != .main) unreachable, } if (metadata.freqIsZero(popped_key)) { @@ -1544,7 +1550,7 @@ test "BufferPool filesize > frame_size * num_frames" { read_frame.inner.cached.frame_indices[0] ][0..bp.frames_metadata.size[ read_frame.inner.cached.frame_indices[0] - ]]; + ].load(.unordered)]; var frame2: [FRAME_SIZE]u8 = undefined; const bytes_read = try file.preadAll(&frame2, offset); @@ -1623,7 +1629,7 @@ test "BufferPool random read" { var total_bytes_read: u32 = 0; for (read.inner.cached.frame_indices) |f_idx| { - total_bytes_read += bp.frames_metadata.size[f_idx]; + total_bytes_read += bp.frames_metadata.size[f_idx].load(.unordered); } const read_data_bp_iter = try allocator.alloc(u8, read.len()); defer allocator.free(read_data_bp_iter);