Skip to content

Commit

Permalink
make a mess
Browse files Browse the repository at this point in the history
  • Loading branch information
Sobeston committed Jan 14, 2025
1 parent 4a29586 commit d3ddddc
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 45 deletions.
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub fn build(b: *Build) void {
.optimize = optimize,
.filters = filters orelse &.{},
.sanitize_thread = enable_tsan,
.test_runner = b.path("src/test_runner.zig"),
});
b.installArtifact(unit_tests_exe);
unit_tests_exe.root_module.addImport("base58-zig", base58_module);
Expand Down
5 changes: 3 additions & 2 deletions src/accountsdb/accounts_file.zig
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub const AccountInFile = struct {
allocator: std.mem.Allocator,
) std.mem.Allocator.Error!Account {
return .{
.data = ReadHandle.initExternalOwned(try self.data.readAllAllocate(allocator)),
.data = try self.data.dupeExternalOwned(allocator),
.executable = self.executable().*,
.lamports = self.lamports().*,
.owner = self.owner().*,
Expand Down Expand Up @@ -281,6 +281,7 @@ pub const AccountFile = struct {

const Self = @This();

// TODO: document the difference between .length and the file size, this tripped me up
pub fn init(file: std.fs.File, accounts_file_info: AccountFileInfo, slot: Slot) !Self {
const file_stat = try file.stat();
const file_size: u64 = @intCast(file_stat.size);
Expand All @@ -289,7 +290,7 @@ pub const AccountFile = struct {

return .{
.file = file,
.length = accounts_file_info.length,
.length = accounts_file_info.length, // is this wrong?
.id = accounts_file_info.id,
.slot = slot,
};
Expand Down
33 changes: 20 additions & 13 deletions src/accountsdb/buffer_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const LinuxIoMode = enum {
Blocking,
IoUring,
};
const linux_io_mode: LinuxIoMode = .IoUring;
const linux_io_mode: LinuxIoMode = .Blocking;

const use_io_uring = builtin.os.tag == .linux and linux_io_mode == .IoUring;

Expand Down Expand Up @@ -143,11 +143,6 @@ pub const BufferPool = struct {

const frame_map_rw = sig.sync.RwMux(FrameMap).init(frame_map);

std.debug.print("\ninitialised bufferpool, frame range: [{*}..{*}]\n\n", .{
frames.ptr,
&frames[frames.len - 1][511],
});

return .{
.frames = frames,
.frames_metadata = frames_metadata,
Expand All @@ -167,8 +162,6 @@ pub const BufferPool = struct {
const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock();
frame_map.deinit(init_allocator);
frame_map_lg.unlock();

std.debug.print("\ndeinitialised bufferpool\n\n", .{});
}

pub fn computeNumberofFrameIndices(
Expand Down Expand Up @@ -350,6 +343,10 @@ pub const BufferPool = struct {
);
errdefer allocator.free(frame_indices);

// if (frame_indices.len == 0) @breakpoint();

// std.debug.print("reading frames: {any}\n", .{frame_indices});

// update found frames in the LFU (we don't want to evict these in the next loop)
var n_invalid_indices: u32 = 0;
for (frame_indices) |f_idx| {
Expand Down Expand Up @@ -392,6 +389,8 @@ pub const BufferPool = struct {
try self.eviction_lfu.insert(self.frames_metadata, f_idx.*);
}

// for (frame_indices) |f_idx| if (f_idx == 2094403) @breakpoint();

// Wait for our file reads to complete, filling the read length into the metadata as we go.
// (This read length will almost always be FRAME_SIZE, however it will likely be less than
// that at the end of the file)
Expand Down Expand Up @@ -485,6 +484,8 @@ pub const BufferPool = struct {
try self.eviction_lfu.insert(self.frames_metadata, f_idx.*);
}

// for (frame_indices) |f_idx| if (f_idx == 2097144) @breakpoint();

return ReadHandle.initCached(
self,
frame_indices,
Expand Down Expand Up @@ -551,9 +552,11 @@ pub const FramesMetadata = struct {

fn deinit(self: *FramesMetadata, allocator: std.mem.Allocator) void {
// NOTE: this check itself is racy, but should never happen
for (self.rc) |*rc| {
for (0.., self.rc) |i, *rc| {
// if (i == 2097144) @breakpoint();

if (rc.isAlive()) {
@panic("BufferPool deinitialised with alive handles");
std.debug.panic("BufferPool deinitialised with alive handle: {}\n", .{i});
}
}
allocator.free(self.rc);
Expand Down Expand Up @@ -840,8 +843,11 @@ pub const ReadHandle = struct {
fn deinit(self: Inner, allocator: std.mem.Allocator) void {
switch (self) {
.cached => |cached| {
// std.debug.print("freeing frames: {any}\n", .{cached.frame_indices});

for (cached.frame_indices) |frame_index| {
std.debug.assert(frame_index != INVALID_FRAME);
// if (frame_index == 2097144) @breakpoint();

if (cached.buffer_pool.frames_metadata.rc[frame_index].release()) {
// notably, the frame remains in memory, and its hashmap entry
Expand Down Expand Up @@ -945,8 +951,9 @@ pub const ReadHandle = struct {
};
}

pub fn dupe(self: ReadHandle) ReadHandle {
return self;
pub fn dupeExternalOwned(self: ReadHandle, allocator: std.mem.Allocator) !ReadHandle {
const data_copy = try self.readAllAllocate(allocator);
return initExternalOwned(data_copy);
}

pub fn slice(self: *ReadHandle, start: usize, end: usize) !ReadHandle {
Expand Down Expand Up @@ -1369,7 +1376,7 @@ test "BufferPool ReadHandle zero-copy slice" {
var expected_error: ?anyerror = null;
if (length > FRAME_SIZE) expected_error = error.StridedBorrow;
if (start_offset < FRAME_SIZE and start_offset + length >= FRAME_SIZE) {
expected_error = error.StridedBorrow;
expected_error = error.CantZeroCopyStrided;
}

const borrowed_slice = read.zeroCopySlice(start_offset, length) catch |err| {
Expand Down
14 changes: 8 additions & 6 deletions src/accountsdb/cache.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,29 @@ pub const AccountsCache = struct {
/// the slot that this version of the account originates from
slot: Slot,

const Buf = []align(@alignOf(CachedAccount)) u8;

/// Makes use of the fact that CachedAccount needs to live on the heap & shares its lifetime
/// with .ref_count in order to allocate once instead of twice.
pub fn initCreate(
allocator: std.mem.Allocator,
account: Account,
slot: Slot,
) error{OutOfMemory}!*CachedAccount {
const buf = try allocator.alignedAlloc(
const buf: Buf = try allocator.alignedAlloc(
u8,
@alignOf(CachedAccount),
@sizeOf(CachedAccount) + account.data.len(),
);
const new_entry = @as(*CachedAccount, @ptrCast(buf.ptr));
const account_data = buf[@sizeOf(CachedAccount)..];

var new_account = account;
new_account.data = ReadHandle.initExternal(account_data);

account.data.read(0, account.data.len(), account_data) catch
unreachable; // account.data invalid?

var new_account = account;
new_account.data = ReadHandle.initExternal(account_data);

new_entry.* = .{
.account = new_account,
.ref_count = .{},
Expand All @@ -55,8 +57,8 @@ pub const AccountsCache = struct {
}

pub fn deinitDestroy(self: *CachedAccount, allocator: std.mem.Allocator) void {
const buf: []align(8) u8 = @as(
[*]align(8) u8,
const buf: Buf = @as(
[*]align(@alignOf(CachedAccount)) u8,
@ptrCast(self),
)[0 .. @sizeOf(CachedAccount) + self.account.data.len()];
@memset(buf, undefined);
Expand Down
65 changes: 45 additions & 20 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ pub const AccountsDB = struct {
{
const file_map, var file_map_lg = self.file_map.writeWithLock();
defer file_map_lg.unlock();
for (file_map.values()) |file| file.deinit(); //
file_map.deinit(self.allocator);
}
{
Expand Down Expand Up @@ -553,6 +554,7 @@ pub const AccountsDB = struct {
defer accounts_cache_lg.unlock();
accounts_cache.deinit();
}
loading_thread.buffer_pool.deinit(per_thread_allocator);
}
}

Expand Down Expand Up @@ -629,7 +631,7 @@ pub const AccountsDB = struct {
}
defer {
if (geyser_slot_storage) |storage| {
storage.deinit();
storage.deinit(self.allocator);
self.allocator.destroy(storage);
}
}
Expand All @@ -652,8 +654,8 @@ pub const AccountsDB = struct {
return err;
};

std.debug.print("file_name_bounded: {s}\n", .{file_name_bounded.constSlice()});
std.debug.print("accounts_file: {}\n", .{accounts_file});
// std.debug.print("file_name_bounded: {s}\n", .{file_name_bounded.constSlice()});
// std.debug.print("accounts_file: {}\n", .{accounts_file});

break :blk AccountFile.init(accounts_file, file_info, slot) catch |err| {
self.logger.err().logf("failed to *open* AccountsFile {s}: {s}\n", .{
Expand Down Expand Up @@ -1238,7 +1240,13 @@ pub const AccountsDB = struct {
// hashes aren't always stored correctly in snapshots
if (account_hash.eql(Hash.ZEROES)) {
const account, var lock_guard = try self.getAccountFromRefWithReadLock(max_slot_ref);
defer lock_guard.unlock();
defer {
lock_guard.unlock();
switch (account) {
.file => |in_file_account| in_file_account.deinit(self.allocator),
.unrooted_map => {},
}
}

account_hash = switch (account) {
.file => |in_file_account| blk: {
Expand Down Expand Up @@ -1515,7 +1523,7 @@ pub const AccountsDB = struct {
}

const file, const file_id = try self.createAccountFile(size, slot);
defer file.close();
errdefer file.close(); //

const offsets = try self.allocator.alloc(u64, accounts.len);
defer self.allocator.free(offsets);
Expand Down Expand Up @@ -1651,8 +1659,12 @@ pub const AccountsDB = struct {

var account_iter = account_file.iterator(self.allocator, &self.buffer_pool);
while (try account_iter.next()) |account| {
defer account.deinit(self.allocator);
const pubkey = account.pubkey().*;

var x: u1 = 1;
x = x;

// check if already cleaned
if (try cleaned_pubkeys.fetchPut(pubkey, {}) != null) continue;

Expand Down Expand Up @@ -1896,6 +1908,8 @@ pub const AccountsDB = struct {
var accounts_dead_size: u64 = 0;
var account_iter = shrink_account_file.iterator(self.allocator, &self.buffer_pool);
while (try account_iter.next()) |*account_in_file| {
defer account_in_file.deinit(self.allocator);

const pubkey = account_in_file.pubkey();
// account is dead if it is not in the index; dead accounts
// are removed from the index during cleaning
Expand Down Expand Up @@ -2333,7 +2347,13 @@ pub const AccountsDB = struct {
) GetTypeFromAccountError!T {
const account, var lock_guard = try self.getAccountWithReadLock(pubkey);
// NOTE: bincode will copy heap memory so its safe to unlock at the end of the function
defer lock_guard.unlock();
defer {
switch (account) {
.file => |file| file.deinit(allocator),
.unrooted_map => {},
}
lock_guard.unlock();
}

const file_data: ReadHandle = switch (account) {
.file => |in_file_account| in_file_account.data,
Expand Down Expand Up @@ -3169,7 +3189,11 @@ pub const GeyserTmpStorage = struct {
};
}

pub fn deinit(self: *Self) void {
pub fn deinit(
self: *Self,
allocator: std.mem.Allocator,
) void {
for (self.accounts.items) |account| account.deinit(allocator);
self.accounts.deinit();
self.pubkeys.deinit();
}
Expand All @@ -3182,6 +3206,7 @@ pub const GeyserTmpStorage = struct {
pub fn cloneAndTrack(self: *Self, allocator: std.mem.Allocator, account_in_file: AccountInFile) Error!void {
// doesn't feel great allocating this?
const account = try account_in_file.toOwnedAccount(allocator);
errdefer account.deinit(allocator);
self.accounts.append(account) catch return Error.OutOfGeyserArrayMemory;
self.pubkeys.append(account_in_file.pubkey().*) catch return Error.OutOfGeyserArrayMemory;
}
Expand Down Expand Up @@ -3281,11 +3306,13 @@ pub fn writeSnapshotTarWithFields(
var fifo = std.fifo.LinearFifo(u8, .{ .Static = std.mem.page_size }).init();
try fifo.pump(account_file.file.reader(), archive_writer_counted);

try snapgen.writeAccountFilePadding(archive_writer_counted, account_file.length);
try snapgen.writeAccountFilePadding(archive_writer_counted, (try account_file.file.stat()).size); // feels silly, should probably store this
}

try archive_writer_counted.writeAll(&sig.utils.tar.sentinel_blocks);
if (std.debug.runtime_safety) {
if (counting_state.bytes_written % 512 != 0) std.debug.print("counting_state.bytes_written: {}\n", .{counting_state.bytes_written});

std.debug.assert(counting_state.bytes_written % 512 == 0);
}
}
Expand Down Expand Up @@ -3733,19 +3760,17 @@ test "load clock sysvar" {
.leader_schedule_epoch = 1,
.unix_timestamp = 1733350255,
};
try std.testing.expectEqual(expected_clock, try accounts_db.getTypeFromAccount(allocator, sysvars.Clock, &sysvars.IDS.clock));

const found_clock = try accounts_db.getTypeFromAccount(allocator, sysvars.Clock, &sysvars.IDS.clock);

// if (found_clock.epoch_start_timestamp == expected_clock.epoch_start_timestamp - 1) return error.SkipZigTest; // how the hell?
try std.testing.expectEqual(expected_clock, found_clock);
}

test "load other sysvars" {
var gpa_state: std.heap.GeneralPurposeAllocator(.{ .stack_trace_frames = 64 }) = .{};
defer _ = gpa_state.deinit();
const allocator = gpa_state.allocator();
// const allocator = std.testing.allocator;
const panic_allocator = sig.utils.allocators.failing.allocator(.{
.alloc = .panics,
.resize = .panics,
.free = .panics,
});

var tmp_dir_root = std.testing.tmpDir(.{});
defer tmp_dir_root.cleanup();
Expand All @@ -3759,9 +3784,9 @@ test "load other sysvars" {
}

const SlotAndHash = sig.accounts_db.snapshots.SlotAndHash;
_ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.EpochSchedule, &sysvars.IDS.epoch_schedule);
_ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.Rent, &sysvars.IDS.rent);
_ = try accounts_db.getTypeFromAccount(panic_allocator, SlotAndHash, &sysvars.IDS.slot_hashes);
_ = try accounts_db.getTypeFromAccount(allocator, sysvars.EpochSchedule, &sysvars.IDS.epoch_schedule);
_ = try accounts_db.getTypeFromAccount(allocator, sysvars.Rent, &sysvars.IDS.rent);
_ = try accounts_db.getTypeFromAccount(allocator, SlotAndHash, &sysvars.IDS.slot_hashes);

const stake_history = try accounts_db.getTypeFromAccount(allocator, sysvars.StakeHistory, &sysvars.IDS.stake_history);
defer sig.bincode.free(allocator, stake_history);
Expand All @@ -3770,8 +3795,8 @@ test "load other sysvars" {
defer sig.bincode.free(allocator, slot_history);

// // not always included in local snapshot
// _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.LastRestartSlot, &sysvars.IDS.last_restart_slot);
// _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.EpochRewards, &sysvars.IDS.epoch_rewards);
// _ = try accounts_db.getTypeFromAccount(allocator, sysvars.LastRestartSlot, &sysvars.IDS.last_restart_slot);
// _ = try accounts_db.getTypeFromAccount(allocator, sysvars.EpochRewards, &sysvars.IDS.epoch_rewards);
}

test "flushing slots works" {
Expand Down
7 changes: 6 additions & 1 deletion src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ pub fn readWithConfig(
.Bool => return switch (try reader.readByte()) {
0 => false,
1 => true,
else => error.BadBoolean,
else => |b| {
std.debug.panic("byte: {0x}\n", .{b});

// @panic("bad");
// break :blk error.BadBoolean;
},
},
.Enum => |_| {
comptime var SerializedSize = u32;
Expand Down
4 changes: 1 addition & 3 deletions src/core/account.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ pub const Account = struct {

// creates a copy of the account. most important is the copy of the data slice.
pub fn clone(self: *const Account, allocator: std.mem.Allocator) !Account {
_ = allocator;

return .{
.lamports = self.lamports,
.data = self.data.dupe(),
.data = try self.data.dupeExternalOwned(allocator),
.owner = self.owner,
.executable = self.executable,
.rent_epoch = self.rent_epoch,
Expand Down

0 comments on commit d3ddddc

Please sign in to comment.