Skip to content

Commit

Permalink
feat(time): expand Instant to support shred repair
Browse files Browse the repository at this point in the history
Signed-off-by: Drew Nutter <[email protected]>
  • Loading branch information
dnut committed Jan 22, 2025
1 parent 56cea53 commit c258bfa
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/ledger/shred_inserter/shred_inserter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub const ShredInserter = struct {
retransmit_sender: ?PointerClosure([]const []const u8, void),
shred_tracker: ?*sig.shred_network.shred_tracker.BasicShredTracker,
) !InsertShredsResult {
const milli_timestamp = std.time.milliTimestamp();
const timestamp = sig.time.Instant.now();
///////////////////////////
// check inputs for validity and edge cases
//
Expand Down Expand Up @@ -192,7 +192,7 @@ pub const ShredInserter = struct {
switch (shred) {
.data => |data_shred| {
if (shred_tracker) |tracker| {
tracker.registerDataShred(&shred.data, milli_timestamp) catch |err| {
tracker.registerDataShred(&shred.data, timestamp) catch |err| {
switch (err) {
error.SlotUnderflow, error.SlotOverflow => {
self.metrics.register_shred_error.observe(@errorCast(err));
Expand Down Expand Up @@ -287,7 +287,7 @@ pub const ShredInserter = struct {
continue;
}
if (shred_tracker) |tracker| {
tracker.registerDataShred(&shred.data, milli_timestamp) catch |err| {
tracker.registerDataShred(&shred.data, timestamp) catch |err| {
switch (err) {
error.SlotUnderflow, error.SlotOverflow => {
self.metrics.register_shred_error.observe(@errorCast(err));
Expand Down
2 changes: 1 addition & 1 deletion src/shred_network/repair_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub const RepairService = struct {
var oldest_slot_needing_repair: u64 = 0;
var newest_slot_needing_repair: u64 = 0;
var repairs = ArrayList(RepairRequest).init(self.allocator);
if (!try self.shred_tracker.identifyMissing(&self.report, std.time.milliTimestamp())) {
if (!try self.shred_tracker.identifyMissing(&self.report, sig.time.Instant.now())) {
return repairs;
}
var individual_count: usize = 0;
Expand Down
34 changes: 17 additions & 17 deletions src/shred_network/shred_tracker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
const Mutex = std.Thread.Mutex;

const Duration = sig.time.Duration;
const Gauge = sig.prometheus.Gauge;
const Instant = sig.time.Instant;
const Registry = sig.prometheus.Registry;
const Slot = sig.core.Slot;

const MAX_SHREDS_PER_SLOT: usize = sig.ledger.shred.MAX_SHREDS_PER_SLOT;

const MIN_SLOT_AGE_TO_REPORT_AS_MISSING: u64 = 600;
const MIN_SLOT_AGE_TO_REPORT_AS_MISSING: Duration = Duration.fromMillis(600);

pub const Range = struct {
start: u32,
Expand Down Expand Up @@ -67,13 +69,13 @@ pub const BasicShredTracker = struct {
pub fn registerDataShred(
self: *Self,
shred: *const sig.ledger.shred.DataShred,
milli_timestamp: i64,
timestamp: Instant,
) !void {
const parent = try shred.parent();
const is_last_in_slot = shred.custom.flags.isSet(.last_shred_in_slot);
const slot = shred.common.slot;
const index = shred.common.index;
try self.registerShred(slot, index, parent, is_last_in_slot, milli_timestamp);
try self.registerShred(slot, index, parent, is_last_in_slot, timestamp);
}

pub fn registerShred(
Expand All @@ -82,15 +84,15 @@ pub const BasicShredTracker = struct {
shred_index: u32,
parent_slot: Slot,
is_last_in_slot: bool,
milli_timestamp: i64,
timestamp: Instant,
) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

const monitored_slot = try self.observeSlot(slot);

const slot_is_complete = monitored_slot
.record(shred_index, is_last_in_slot, milli_timestamp);
.record(shred_index, is_last_in_slot, timestamp);

if (slot > self.max_slot_processed) {
self.max_slot_processed = slot;
Expand Down Expand Up @@ -168,7 +170,7 @@ pub const BasicShredTracker = struct {
pub fn identifyMissing(
self: *Self,
slot_reports: *MultiSlotReport,
milli_timestamp: i64,
timestamp: Instant,
) (Allocator.Error || SlotOutOfBounds)!bool {
if (self.start_slot == null) return false;
self.mux.lock();
Expand All @@ -179,11 +181,9 @@ pub const BasicShredTracker = struct {
const last_slot_to_check = @max(self.max_slot_processed, self.current_bottom_slot);
for (self.current_bottom_slot..last_slot_to_check + 1) |slot| {
const monitored_slot = try self.getMonitoredSlot(slot);
if (monitored_slot.first_received_timestamp_ms +
MIN_SLOT_AGE_TO_REPORT_AS_MISSING > milli_timestamp)
{
if (timestamp.elapsedSince(monitored_slot.first_received_timestamp)
.lt(MIN_SLOT_AGE_TO_REPORT_AS_MISSING))
continue;
}
var slot_report = try slot_reports.addOne();
slot_report.slot = slot;
try monitored_slot.identifyMissing(&slot_report.missing_shreds);
Expand Down Expand Up @@ -279,15 +279,15 @@ const MonitoredSlot = struct {
shreds: ShredSet = ShredSet.initEmpty(),
max_seen: ?u32 = null,
last_shred: ?u32 = null,
first_received_timestamp_ms: i64 = 0,
first_received_timestamp: Instant = Instant.UNIX_EPOCH,
is_complete: bool = false,
parent_slot: ?Slot = null,
unique_observed_count: u32 = 0,

const Self = @This();

/// returns if the slot is *definitely* complete (there may be false negatives)
pub fn record(self: *Self, shred_index: u32, is_last_in_slot: bool, milli_timestamp: i64) bool {
pub fn record(self: *Self, shred_index: u32, is_last_in_slot: bool, timestamp: Instant) bool {
if (self.is_complete) return false;
if (!bit_set.setAndWasSet(&self.shreds, shred_index)) self.unique_observed_count += 1;

Expand All @@ -299,7 +299,7 @@ const MonitoredSlot = struct {

if (self.max_seen == null) {
self.max_seen = shred_index;
self.first_received_timestamp_ms = milli_timestamp;
self.first_received_timestamp = timestamp;
} else {
self.max_seen = @max(self.max_seen.?, shred_index);
}
Expand Down Expand Up @@ -348,7 +348,7 @@ test "trivial happy path" {

var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry());

_ = try tracker.identifyMissing(&msr, 1_000);
_ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH.plus(Duration.fromSecs(1)));

try std.testing.expect(1 == msr.len);
const report = msr.items()[0];
Expand All @@ -365,12 +365,12 @@ test "1 registered shred is identified" {
defer msr.deinit();

var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry());
try tracker.registerShred(13579, 123, 13578, false, 0);
try tracker.registerShred(13579, 123, 13578, false, Instant.UNIX_EPOCH);

_ = try tracker.identifyMissing(&msr, 0);
_ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH);
try std.testing.expectEqual(0, msr.len);

_ = try tracker.identifyMissing(&msr, 1_000);
_ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH.plus(Duration.fromSecs(1)));
try std.testing.expectEqual(1, msr.len);

const report = msr.items()[0];
Expand Down
22 changes: 22 additions & 0 deletions src/time/time.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

const std = @import("std");
const builtin = @import("builtin");
const string = []const u8;
const time = @This();

Expand Down Expand Up @@ -563,6 +564,15 @@ pub const Duration = struct {
pub const Instant = struct {
inner: std.time.Instant,

pub const UNIX_EPOCH = Instant{ .inner = .{
.timestamp = if (is_posix) .{ .tv_sec = 0, .tv_nsec = 0 } else 0,
} };

const is_posix = switch (builtin.os.tag) {
.windows, .uefi, .wasi => false,
else => true,
};

pub fn now() Instant {
return .{ .inner = std.time.Instant.now() catch unreachable };
}
Expand All @@ -575,6 +585,18 @@ pub const Instant = struct {
return Duration.fromNanos(self.inner.since(earlier.inner));
}

pub fn plus(self: Instant, duration: Duration) Instant {
if (is_posix) {
const new_ns = self.inner.timestamp.tv_nsec + @as(isize, @intCast(duration.ns));
return .{ .inner = .{ .timestamp = .{
.tv_sec = self.inner.timestamp.tv_sec + @divFloor(new_ns, std.time.ns_per_s),
.tv_nsec = @mod(new_ns, std.time.ns_per_s),
} } };
} else {
return .{ .inner = .{ .timestamp = self.inner.timestamp + duration.ns } };
}
}

pub fn format(self: @This(), comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
return try writer.print("{s}", .{std.fmt.fmtDuration(switch (@TypeOf(self.inner.timestamp)) {
u64 => self.inner.timestamp,
Expand Down

0 comments on commit c258bfa

Please sign in to comment.