Skip to content

Commit

Permalink
Merge branch 'main' into dnut/repair3
Browse files Browse the repository at this point in the history
  • Loading branch information
dnut committed Jun 12, 2024
2 parents 65af230 + 10082ad commit b5b6828
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 61 deletions.
21 changes: 9 additions & 12 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,24 @@ pub fn build(b: *Build) void {

const main_exe_run = b.addRunArtifact(sig_exe);
main_exe_run.addArgs(b.args orelse &.{});
main_exe_run.step.dependOn(b.getInstallStep());
run_step.dependOn(&main_exe_run.step);

// unit tests
const unit_tests = b.addTest(.{
const unit_tests_exe = b.addTest(.{
.root_source_file = b.path("src/tests.zig"),
.target = target,
.optimize = optimize,
.filters = filters orelse &.{},
});
b.installArtifact(unit_tests);
unit_tests.root_module.addImport("base58-zig", base58_module);
unit_tests.root_module.addImport("curl", curl_mod);
unit_tests.root_module.addImport("httpz", httpz_mod);
unit_tests.root_module.addImport("zig-network", zig_network_module);
unit_tests.root_module.addImport("zstd", zstd_mod);
b.installArtifact(unit_tests_exe);
unit_tests_exe.root_module.addImport("base58-zig", base58_module);
unit_tests_exe.root_module.addImport("curl", curl_mod);
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_module);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);

const unit_tests_run = b.addRunArtifact(unit_tests);
test_step.dependOn(&unit_tests_run.step);
const unit_tests_exe_run = b.addRunArtifact(unit_tests_exe);
test_step.dependOn(&unit_tests_exe_run.step);

const fuzz_exe = b.addExecutable(.{
.name = "fuzz",
Expand All @@ -95,7 +94,6 @@ pub fn build(b: *Build) void {

const fuzz_exe_run = b.addRunArtifact(fuzz_exe);
fuzz_exe_run.addArgs(b.args orelse &.{});
fuzz_exe_run.step.dependOn(b.getInstallStep());
fuzz_step.dependOn(&fuzz_exe_run.step);

const benchmark_exe = b.addExecutable(.{
Expand All @@ -110,7 +108,6 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("httpz", httpz_mod);

const benchmark_exe_run = b.addRunArtifact(benchmark_exe);
benchmark_exe_run.step.dependOn(b.getInstallStep());
benchmark_exe_run.addArgs(b.args orelse &.{});
benchmark_step.dependOn(&benchmark_exe_run.step);
}
20 changes: 17 additions & 3 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const AccountFileInfo = @import("../accountsdb/snapshots.zig").AccountFileInfo;
const AccountFile = @import("../accountsdb/accounts_file.zig").AccountFile;
const FileId = @import("../accountsdb/accounts_file.zig").FileId;
const AccountInFile = @import("../accountsdb/accounts_file.zig").AccountInFile;
const Blake3 = std.crypto.hash.Blake3;

const ThreadPool = @import("../sync/thread_pool.zig").ThreadPool;

Expand Down Expand Up @@ -763,11 +764,24 @@ pub const AccountsDB = struct {
} orelse continue;
const result = try self.getAccountHashAndLamportsFromRef(max_slot_ref);

// only include non-zero lamport accounts (for full snapshots)
const lamports = result.lamports;
if (config == .FullAccountHash and lamports == 0) continue;
var account_hash = result.hash;
if (lamports == 0) {
switch (config) {
// for full snapshots, only include non-zero lamport accounts
.FullAccountHash => continue,
// zero-lamport accounts for incrementals = hash(pubkey)
.IncrementalAccountHash => Blake3.hash(&key.data, &account_hash.data, .{}),
}
} else {
// hashes arent always stored correctly in snapshots
if (account_hash.order(&Hash.default()) == .eq) {
const account = try self.getAccountFromRef(max_slot_ref);
account_hash = account.hash(&key);
}
}

hashes.appendAssumeCapacity(result.hash);
hashes.appendAssumeCapacity(account_hash);
local_total_lamports += lamports;
}

Expand Down
16 changes: 12 additions & 4 deletions src/accountsdb/snapshots.zig
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ pub const StatusCache = struct {
bincode.free(self.bank_slot_deltas.allocator, self.*);
}

/// [verify_slot_deltas](https://github.com/anza-xyz/agave/blob/ed500b5afc77bc78d9890d96455ea7a7f28edbf9/runtime/src/snapshot_bank_utils.rs#L709)
pub fn validate(
self: *const StatusCache,
allocator: std.mem.Allocator,
Expand Down Expand Up @@ -758,14 +759,21 @@ pub const StatusCache = struct {
return error.SlotHistoryMismatch;
}
for (slots_seen.keys()) |slot| {
if (slot_history.check(slot) != sysvars.SlotCheckResult.Found) {
if (slot_history.check(slot) != .Found) {
return error.SlotNotFoundInHistory;
}
}
for (slot_history.oldest()..slot_history.newest()) |slot| {
if (!slots_seen.contains(slot)) {
return error.SlotNotFoundInStatusCache;

var slots_checked: u32 = 0;
var slot = slot_history.newest();
while (slot >= slot_history.oldest() and slots_checked != MAX_CACHE_ENTRIES) {
if (slot_history.check(slot) == .Found) {
slots_checked += 1;
if (!slots_seen.contains(slot)) {
return error.SlotNotFoundInStatusCache;
}
}
slot -= 1;
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/common/merkle_tree.zig
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub const NestedHashTree = struct {
var i: usize = 0;
while (i < self.hashes.len) {
const nested_len = self.hashes[i].items.len;
if ((search_index + nested_len) > index) {
if (search_index + nested_len > index) {
const index_in_nested = index - search_index;
return &self.hashes[i].items[index_in_nested];
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/core/hash.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const base58 = @import("base58-zig");

pub const HASH_SIZE: usize = 32;

pub const Hash = struct {
pub const Hash = extern struct {
data: [HASH_SIZE]u8,

pub fn default() Hash {
Expand Down
5 changes: 4 additions & 1 deletion src/core/pubkey.zig
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ pub const Pubkey = extern struct {
}

pub fn format(self: @This(), comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
return writer.print("{s}", .{self.string()});
var dest: [44]u8 = undefined;
@memset(&dest, 0);
const written = encoder.encode(&self.data, &dest) catch return error.EncodingError;
return writer.print("{s}", .{dest[0..written]});
}

pub fn isDefault(self: *const Self) bool {
Expand Down
6 changes: 3 additions & 3 deletions src/shred_collector/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn start(
deps.allocator,
1000,
);
const verified_shreds_channel = sig.sync.Channel(std.ArrayList(sig.net.Packet)).init(
const verified_shred_channel = sig.sync.Channel(std.ArrayList(sig.net.Packet)).init(
deps.allocator,
1000,
);
Expand All @@ -100,7 +100,7 @@ pub fn start(
.{
deps.exit,
unverified_shred_channel,
verified_shreds_channel,
verified_shred_channel,
deps.leader_schedule,
},
);
Expand All @@ -116,7 +116,7 @@ pub fn start(
try service_manager.spawn(
"Shred Processor",
shred_collector.shred_processor.runShredProcessor,
.{ deps.allocator, verified_shreds_channel, shred_tracker },
.{ deps.allocator, deps.logger, verified_shred_channel, shred_tracker },
);

// repair (thread)
Expand Down
4 changes: 2 additions & 2 deletions src/shred_collector/shred.zig
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub const Shred = union(ShredType) {
}

pub fn fromPayload(allocator: Allocator, payload: []const u8) !Self {
const variant = layout.getShredVariant(payload) orelse return error.uygugj;
const variant = layout.getShredVariant(payload) orelse return error.InvalidShredVariant;
return switch (variant.shred_type) {
.Code => .{ .Code = .{ .fields = try CodingShred.Fields.fromPayload(allocator, payload) } },
.Data => .{ .Data = .{ .fields = try DataShred.Fields.fromPayload(allocator, payload) } },
Expand Down Expand Up @@ -154,7 +154,7 @@ pub const DataShred = struct {
return self.payload[consts.headers_size..size];
}

pub fn parent(self: *const Self) !Slot {
pub fn parent(self: *const Self) error{InvalidParentOffset}!Slot {
const slot = self.fields.common.slot;
if (self.fields.custom.parent_offset == 0 and slot != 0) {
return error.InvalidParentOffset;
Expand Down
65 changes: 43 additions & 22 deletions src/shred_collector/shred_processor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ const ArrayList = std.ArrayList;

const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker;
const Channel = sig.sync.Channel;
const Logger = sig.trace.Logger;
const Packet = sig.net.Packet;
const Shred = shred_collector.shred.Shred;

/// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395)
pub fn runShredProcessor(
allocator: Allocator,
logger: Logger,
// shred verifier --> me
verified_shred_receiver: *Channel(ArrayList(Packet)),
tracker: *BasicShredTracker,
) !void {
var processed_count: usize = 0;
var buf = ArrayList(ArrayList(Packet)).init(allocator);
var error_context = ErrorContext{};
while (true) {
try verified_shred_receiver.tryDrainRecycle(&buf);
if (buf.items.len == 0) {
Expand All @@ -29,29 +31,48 @@ pub fn runShredProcessor(
}
for (buf.items) |packet_batch| {
for (packet_batch.items) |*packet| if (!packet.flags.isSet(.discard)) {
const shred_payload = layout.getShred(packet) orelse continue;
const slot = layout.getSlot(shred_payload) orelse continue;
const index = layout.getIndex(shred_payload) orelse continue;
tracker.registerShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => continue,
else => return err,
processShred(allocator, tracker, packet, &error_context) catch |e| {
logger.errf(
"failed to process verified shred {?}.{?}: {}",
.{ error_context.slot, error_context.index, e },
);
error_context = .{};
};
var shred = try Shred.fromPayload(allocator, shred_payload);
if (shred == Shred.Data) {
const parent = try shred.Data.parent();
if (parent + 1 != slot) {
try tracker.skipSlots(parent, slot);
}
}
defer shred.deinit();
if (shred.isLastInSlot()) {
tracker.setLastShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => continue,
else => return err,
};
}
processed_count += 1;
};
}
}
}

const ErrorContext = struct { slot: ?u64 = null, index: ?u32 = null };

fn processShred(
allocator: Allocator,
tracker: *BasicShredTracker,
packet: *const Packet,
error_context: *ErrorContext,
) !void {
const shred_payload = layout.getShred(packet) orelse return error.InvalidPayload;
const slot = layout.getSlot(shred_payload) orelse return error.InvalidSlot;
errdefer error_context.slot = slot;
const index = layout.getIndex(shred_payload) orelse return error.InvalidIndex;
errdefer error_context.index = index;

tracker.registerShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => return,
};
var shred = try Shred.fromPayload(allocator, shred_payload);
defer shred.deinit();
if (shred == Shred.Data) {
const parent = try shred.Data.parent();
if (parent + 1 != slot) {
tracker.skipSlots(parent, slot) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => {},
};
}
}
if (shred.isLastInSlot()) {
tracker.setLastShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => return,
};
}
}
20 changes: 11 additions & 9 deletions src/shred_collector/shred_tracker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub const BasicShredTracker = struct {
self: *Self,
start_inclusive: Slot,
end_exclusive: Slot,
) !void {
) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

Expand All @@ -75,17 +75,17 @@ pub const BasicShredTracker = struct {
self: *Self,
slot: Slot,
shred_index: u64,
) !void {
) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

const monitored_slot = try self.observeSlot(slot);
const new = try monitored_slot.record(shred_index);
const new = monitored_slot.record(shred_index);
if (new) self.logger.debugf("new slot: {}", .{slot});
self.max_slot_processed = @max(self.max_slot_processed, slot);
}

pub fn setLastShred(self: *Self, slot: Slot, index: usize) !void {
pub fn setLastShred(self: *Self, slot: Slot, index: usize) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

Expand All @@ -98,7 +98,7 @@ pub const BasicShredTracker = struct {
}

/// returns whether it makes sense to send any repair requests
pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) !bool {
pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) (Allocator.Error || SlotOutOfBounds)!bool {
if (self.start_slot == null) return false;
self.mux.lock();
defer self.mux.unlock();
Expand Down Expand Up @@ -131,14 +131,14 @@ pub const BasicShredTracker = struct {

/// - Record that a slot has been observed.
/// - Acquire the slot's status for mutation.
fn observeSlot(self: *Self, slot: Slot) !*MonitoredSlot {
fn observeSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot {
self.maybeSetStart(slot);
self.max_slot_seen = @max(self.max_slot_seen, slot);
const monitored_slot = try self.getMonitoredSlot(slot);
return monitored_slot;
}

fn getMonitoredSlot(self: *Self, slot: Slot) error{ SlotUnderflow, SlotOverflow }!*MonitoredSlot {
fn getMonitoredSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot {
if (slot > self.current_bottom_slot + num_slots - 1) {
return error.SlotOverflow;
}
Expand Down Expand Up @@ -179,6 +179,8 @@ pub const SlotReport = struct {

const ShredSet = std.bit_set.ArrayBitSet(usize, MAX_SHREDS_PER_SLOT / 10);

pub const SlotOutOfBounds = error{ SlotUnderflow, SlotOverflow };

const MonitoredSlot = struct {
shreds: ShredSet = ShredSet.initEmpty(),
max_seen: ?usize = null,
Expand All @@ -189,7 +191,7 @@ const MonitoredSlot = struct {
const Self = @This();

/// returns whether this is the first shred received for the slot
pub fn record(self: *Self, shred_index: usize) !bool {
pub fn record(self: *Self, shred_index: usize) bool {
if (self.is_complete) return false;
self.shreds.set(shred_index);
if (self.max_seen == null) {
Expand All @@ -201,7 +203,7 @@ const MonitoredSlot = struct {
return false;
}

pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) !void {
pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) Allocator.Error!void {
missing_shreds.clearRetainingCapacity();
if (self.is_complete) return;
const highest_shred_to_check = self.last_shred orelse self.max_seen orelse 0;
Expand Down
2 changes: 1 addition & 1 deletion src/utils/collections.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn RecyclingList(
self.len = 0;
}

pub fn addOne(self: *Self) !*T {
pub fn addOne(self: *Self) Allocator.Error!*T {
if (self.len < self.private.items.len) {
const item = &self.private.items[self.len];
resetItem(item);
Expand Down
3 changes: 1 addition & 2 deletions src/utils/tar.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ pub fn parallelUntarToFileSystem(
thread_pool.deinit();
}

logger.infof("using {d} threads to unpack snapshot\n", .{n_threads});

logger.infof("using {d} threads to unpack snapshot", .{n_threads});
const tasks = try UnTarTask.init(allocator, n_threads);
defer allocator.free(tasks);

Expand Down

0 comments on commit b5b6828

Please sign in to comment.