Skip to content

Commit

Permalink
frames metadata: use atomics for all fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Sobeston committed Jan 16, 2025
1 parent e8cbcf7 commit 2b11471
Showing 1 changed file with 52 additions and 46 deletions.
98 changes: 52 additions & 46 deletions src/accountsdb/buffer_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,11 @@ pub const BufferPool = struct {
size: FrameOffset,
) error{CannotOverwriteAliveInfo}!void {
try self.overwriteDeadFrameInfoNoSize(f_idx, file_id, frame_aligned_file_offset);
self.frames_metadata.size[f_idx] = size;
self.frames_metadata.size[f_idx].store(size, .release);
}

/// Useful if you don't currently know the size.
/// make sure to set the size later (!)
/// TODO: atomics
fn overwriteDeadFrameInfoNoSize(
self: *BufferPool,
f_idx: FrameIndex,
Expand All @@ -264,13 +263,13 @@ pub const BufferPool = struct {
}

self.frames_metadata.freqSetToZero(f_idx);
self.frames_metadata.in_queue[f_idx] = .none;
self.frames_metadata.in_queue[f_idx].store(.none, .release);
self.frames_metadata.rc[f_idx].reset();

self.frames_metadata.key[f_idx] = .{
self.frames_metadata.key[f_idx].store(@bitCast(FileIdFileOffset{
.file_id = file_id,
.file_offset = frame_aligned_file_offset,
};
}), .release);

const key: FileIdFileOffset = .{
.file_id = file_id,
Expand All @@ -293,7 +292,7 @@ pub const BufferPool = struct {
const did_remove = blk: {
const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock();
defer frame_map_lg.unlock();
break :blk frame_map.remove(self.frames_metadata.key[evicted]);
break :blk frame_map.remove(@bitCast(self.frames_metadata.key[evicted].load(.acquire)));
};
if (!did_remove) {
std.debug.panic(
Expand Down Expand Up @@ -427,8 +426,7 @@ pub const BufferPool = struct {
const bytes_read: FrameOffset = @intCast(cqe.res);
std.debug.assert(bytes_read <= FRAME_SIZE);

// TODO: atomics
self.frames_metadata.size[f_idx] = bytes_read;
self.frames_metadata.size[f_idx].store(bytes_read, .release);
}
}

Expand Down Expand Up @@ -510,27 +508,29 @@ pub const BufferPool = struct {
}
};

/// TODO: atomics on all index accesses
pub const FramesMetadata = struct {
pub const InQueue = enum(u2) { none, small, main, ghost };
pub const InQueue = enum(u8) { none, small, main, ghost }; // u8 required for extern usage

/// ref count for the frame. For frames that are currently being used elsewhere.
rc: []sig.sync.ReferenceCounter,

/// effectively the inverse of BufferPool.FrameMap, used in order to
/// evict keys by their value
key: []FileIdFileOffset,
/// This is really a FileIdFileOffset.
/// TODO: Zig 0.14 (#20590) seems to let us to do atomics on packed structs directly.
key: []std.atomic.Value(u64),

/// frequency for the S3_FIFO
/// frequency for the HierarchicalFIFO
/// Yes, really, only 0, 1, 2, 3.
/// Atomic - do not access directly.
freq: []u2,

/// which S3_FIFO queue this frame exists in
in_queue: []InQueue,
/// which HierarchicalFIFO queue this frame exists in
in_queue: []std.atomic.Value(InQueue),

/// 0..=512
size: []FrameOffset,
/// This is really a FrameOffset, but I've upped it to a u16 to appease std.atomic
size: []std.atomic.Value(u16),

fn init(allocator: std.mem.Allocator, num_frames: usize) !FramesMetadata {
const rc = try allocator.alignedAlloc(
Expand All @@ -541,21 +541,33 @@ pub const FramesMetadata = struct {
errdefer allocator.free(rc);
@memset(rc, .{ .state = .{ .raw = 0 } });

const key = try allocator.alignedAlloc(FileIdFileOffset, std.mem.page_size, num_frames);
const key = try allocator.alignedAlloc(
std.atomic.Value(u64),
std.mem.page_size,
num_frames,
);
errdefer allocator.free(key);
@memset(key, .{ .file_id = FileId.fromInt(0), .file_offset = 0 });
@memset(
key,
std.atomic.Value(u64).init(
@bitCast(FileIdFileOffset{
.file_id = FileId.fromInt(0),
.file_offset = 0,
}),
),
);

const freq = try allocator.alignedAlloc(u2, std.mem.page_size, num_frames);
errdefer allocator.free(freq);
@memset(freq, 0);

const in_queue = try allocator.alignedAlloc(InQueue, std.mem.page_size, num_frames);
const in_queue = try allocator.alignedAlloc(std.atomic.Value(InQueue), std.mem.page_size, num_frames);
errdefer allocator.free(in_queue);
@memset(in_queue, .none);
@memset(in_queue, std.atomic.Value(InQueue).init(.none));

const size = try allocator.alignedAlloc(FrameOffset, std.mem.page_size, num_frames);
const size = try allocator.alignedAlloc(std.atomic.Value(u16), std.mem.page_size, num_frames);
errdefer allocator.free(size);
@memset(size, 0);
@memset(size, std.atomic.Value(u16).init(0));

return .{
.rc = rc,
Expand Down Expand Up @@ -586,45 +598,39 @@ pub const FramesMetadata = struct {
// TODO: this should *all* be atomic (!)
fn resetFrame(self: FramesMetadata, index: FrameIndex) error{CannotResetAlive}!void {
if (self.rc[index].isAlive()) return error.CannotResetAlive;
self.freq[index] = 0;
self.in_queue[index] = .none;
self.size[index] = 0;
self.key[index] = FileIdFileOffset.INVALID;

self.freqSetToZero(index);
self.in_queue[index].store(.none, .release);
self.size[index].store(0, .release);
self.key[index].store(@bitCast(FileIdFileOffset.INVALID), .release);
}

fn freqIncrement(self: FramesMetadata, index: FrameIndex) void {
// note for reviewers: I myself do not trust this code, have a think about it

const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acq_rel);
const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acquire);
if (old_freq == 0) {
// we overflowed (3->0), set back to max
@atomicStore(u2, &self.freq[index], 3, .release);
}
}

fn freqDecrement(self: FramesMetadata, index: FrameIndex) void {
// note for reviewers: I myself do not trust this code, have a think about it

const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acq_rel);
const old_freq = @atomicRmw(u2, &self.freq[index], .Add, 1, .acquire);
if (old_freq == 3) {
// we overflowed (0->3), set back to min
@atomicStore(u2, &self.freq[index], 0, .release);
}
}

fn freqIsZero(self: FramesMetadata, index: FrameIndex) bool {
// note for reviewers: I myself do not trust this code, have a think about it
const freq = @atomicLoad(u2, &self.freq[index], .acquire);
return freq == 0;
}

fn freqSetToOne(self: FramesMetadata, index: FrameIndex) void {
// note for reviewers: I myself do not trust this code, have a think about it
@atomicStore(u2, &self.freq[index], 1, .release);
}

fn freqSetToZero(self: FramesMetadata, index: FrameIndex) void {
// note for reviewers: I myself do not trust this code, have a think about it
@atomicStore(u2, &self.freq[index], 0, .release);
}
};
Expand Down Expand Up @@ -693,7 +699,7 @@ pub const HierarchicalFIFO = struct {
) error{InvalidKey}!void {
if (key == INVALID_FRAME) return error.InvalidKey;

switch (metadata.in_queue[key]) {
switch (metadata.in_queue[key].load(.acquire)) {
.main, .small => {
metadata.freqIncrement(key);
},
Expand All @@ -703,22 +709,22 @@ pub const HierarchicalFIFO = struct {
// Add key to main too - important to note that the key *still*
// exists within ghost, but from now on we'll ignore that entry.
self.main.writeItemAssumeCapacity(key);
metadata.in_queue[key] = .main;
metadata.in_queue[key].store(.main, .release);
},
.none => {
if (self.small.writableLengthIsZero()) {
const popped_small = self.small.readItem().?;

if (metadata.freqIsZero(popped_small)) {
self.ghost.writeItemAssumeCapacity(popped_small);
metadata.in_queue[popped_small] = .ghost;
metadata.in_queue[popped_small].store(.ghost, .release);
} else {
self.main.writeItemAssumeCapacity(popped_small);
metadata.in_queue[popped_small] = .main;
metadata.in_queue[popped_small].store(.main, .release);
}
}
self.small.writeItemAssumeCapacity(key);
metadata.in_queue[key] = .small;
metadata.in_queue[key].store(.small, .release);
},
}
}
Expand Down Expand Up @@ -758,13 +764,13 @@ pub const HierarchicalFIFO = struct {
if (metadata.rc[evicted].isAlive()) {
metadata.freqSetToOne(evicted);
self.main.writeItemAssumeCapacity(evicted);
metadata.in_queue[evicted] = .main;
metadata.in_queue[evicted].store(.main, .release);
alive_eviction_attempts += 1;
continue;
}

// key is definitely dead
metadata.in_queue[evicted] = .none;
metadata.in_queue[evicted].store(.none, .release);
break evicted;
};

Expand All @@ -773,7 +779,7 @@ pub const HierarchicalFIFO = struct {

fn evictGhost(self: *HierarchicalFIFO, metadata: Metadata) ?Key {
const evicted: ?Key = while (self.ghost.readItem()) |ghost_key| {
switch (metadata.in_queue[ghost_key]) {
switch (metadata.in_queue[ghost_key].load(.acquire)) {
.ghost => {
break ghost_key;
},
Expand Down Expand Up @@ -806,8 +812,8 @@ pub const HierarchicalFIFO = struct {

const evicted: ?Key = while (queue.readItem()) |popped_key| {
switch (target_queue) {
.small => if (metadata.in_queue[popped_key] != .small) unreachable,
.main => if (metadata.in_queue[popped_key] != .main) unreachable,
.small => if (metadata.in_queue[popped_key].load(.acquire) != .small) unreachable,
.main => if (metadata.in_queue[popped_key].load(.acquire) != .main) unreachable,
}

if (metadata.freqIsZero(popped_key)) {
Expand Down Expand Up @@ -1544,7 +1550,7 @@ test "BufferPool filesize > frame_size * num_frames" {
read_frame.inner.cached.frame_indices[0]
][0..bp.frames_metadata.size[
read_frame.inner.cached.frame_indices[0]
]];
].load(.unordered)];

var frame2: [FRAME_SIZE]u8 = undefined;
const bytes_read = try file.preadAll(&frame2, offset);
Expand Down Expand Up @@ -1623,7 +1629,7 @@ test "BufferPool random read" {

var total_bytes_read: u32 = 0;
for (read.inner.cached.frame_indices) |f_idx| {
total_bytes_read += bp.frames_metadata.size[f_idx];
total_bytes_read += bp.frames_metadata.size[f_idx].load(.unordered);
}
const read_data_bp_iter = try allocator.alloc(u8, read.len());
defer allocator.free(read_data_bp_iter);
Expand Down

0 comments on commit 2b11471

Please sign in to comment.