From c258bfa74e1c6083270fef0be1b5c69c7e784818 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 22 Jan 2025 11:28:41 -0500 Subject: [PATCH] feat(time): expand Instant to support shred repair Signed-off-by: Drew Nutter --- src/ledger/shred_inserter/shred_inserter.zig | 6 ++-- src/shred_network/repair_service.zig | 2 +- src/shred_network/shred_tracker.zig | 34 ++++++++++---------- src/time/time.zig | 22 +++++++++++++ 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index 10fab289a..de24c494a 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -150,7 +150,7 @@ pub const ShredInserter = struct { retransmit_sender: ?PointerClosure([]const []const u8, void), shred_tracker: ?*sig.shred_network.shred_tracker.BasicShredTracker, ) !InsertShredsResult { - const milli_timestamp = std.time.milliTimestamp(); + const timestamp = sig.time.Instant.now(); /////////////////////////// // check inputs for validity and edge cases // @@ -192,7 +192,7 @@ pub const ShredInserter = struct { switch (shred) { .data => |data_shred| { if (shred_tracker) |tracker| { - tracker.registerDataShred(&shred.data, milli_timestamp) catch |err| { + tracker.registerDataShred(&shred.data, timestamp) catch |err| { switch (err) { error.SlotUnderflow, error.SlotOverflow => { self.metrics.register_shred_error.observe(@errorCast(err)); @@ -287,7 +287,7 @@ pub const ShredInserter = struct { continue; } if (shred_tracker) |tracker| { - tracker.registerDataShred(&shred.data, milli_timestamp) catch |err| { + tracker.registerDataShred(&shred.data, timestamp) catch |err| { switch (err) { error.SlotUnderflow, error.SlotOverflow => { self.metrics.register_shred_error.observe(@errorCast(err)); diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index a5f07c33c..bd3c311dd 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -195,7 +195,7 @@ pub const RepairService = struct { var oldest_slot_needing_repair: u64 = 0; var newest_slot_needing_repair: u64 = 0; var repairs = ArrayList(RepairRequest).init(self.allocator); - if (!try self.shred_tracker.identifyMissing(&self.report, std.time.milliTimestamp())) { + if (!try self.shred_tracker.identifyMissing(&self.report, sig.time.Instant.now())) { return repairs; } var individual_count: usize = 0; diff --git a/src/shred_network/shred_tracker.zig b/src/shred_network/shred_tracker.zig index 50506959d..7a60d28ec 100644 --- a/src/shred_network/shred_tracker.zig +++ b/src/shred_network/shred_tracker.zig @@ -5,13 +5,15 @@ const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; const Mutex = std.Thread.Mutex; +const Duration = sig.time.Duration; const Gauge = sig.prometheus.Gauge; +const Instant = sig.time.Instant; const Registry = sig.prometheus.Registry; const Slot = sig.core.Slot; const MAX_SHREDS_PER_SLOT: usize = sig.ledger.shred.MAX_SHREDS_PER_SLOT; -const MIN_SLOT_AGE_TO_REPORT_AS_MISSING: u64 = 600; +const MIN_SLOT_AGE_TO_REPORT_AS_MISSING: Duration = Duration.fromMillis(600); pub const Range = struct { start: u32, @@ -67,13 +69,13 @@ pub const BasicShredTracker = struct { pub fn registerDataShred( self: *Self, shred: *const sig.ledger.shred.DataShred, - milli_timestamp: i64, + timestamp: Instant, ) !void { const parent = try shred.parent(); const is_last_in_slot = shred.custom.flags.isSet(.last_shred_in_slot); const slot = shred.common.slot; const index = shred.common.index; - try self.registerShred(slot, index, parent, is_last_in_slot, milli_timestamp); + try self.registerShred(slot, index, parent, is_last_in_slot, timestamp); } pub fn registerShred( @@ -82,7 +84,7 @@ pub const BasicShredTracker = struct { shred_index: u32, parent_slot: Slot, is_last_in_slot: bool, - milli_timestamp: i64, + timestamp: Instant, ) SlotOutOfBounds!void { self.mux.lock(); defer self.mux.unlock(); @@ -90,7 +92,7 @@ pub const BasicShredTracker = struct { const monitored_slot = try self.observeSlot(slot); const slot_is_complete = monitored_slot - .record(shred_index, is_last_in_slot, milli_timestamp); + .record(shred_index, is_last_in_slot, timestamp); if (slot > self.max_slot_processed) { self.max_slot_processed = slot; @@ -168,7 +170,7 @@ pub const BasicShredTracker = struct { pub fn identifyMissing( self: *Self, slot_reports: *MultiSlotReport, - milli_timestamp: i64, + timestamp: Instant, ) (Allocator.Error || SlotOutOfBounds)!bool { if (self.start_slot == null) return false; self.mux.lock(); @@ -179,11 +181,9 @@ pub const BasicShredTracker = struct { const last_slot_to_check = @max(self.max_slot_processed, self.current_bottom_slot); for (self.current_bottom_slot..last_slot_to_check + 1) |slot| { const monitored_slot = try self.getMonitoredSlot(slot); - if (monitored_slot.first_received_timestamp_ms + - MIN_SLOT_AGE_TO_REPORT_AS_MISSING > milli_timestamp) - { + if (timestamp.elapsedSince(monitored_slot.first_received_timestamp) + .lt(MIN_SLOT_AGE_TO_REPORT_AS_MISSING)) continue; - } var slot_report = try slot_reports.addOne(); slot_report.slot = slot; try monitored_slot.identifyMissing(&slot_report.missing_shreds); @@ -279,7 +279,7 @@ const MonitoredSlot = struct { shreds: ShredSet = ShredSet.initEmpty(), max_seen: ?u32 = null, last_shred: ?u32 = null, - first_received_timestamp_ms: i64 = 0, + first_received_timestamp: Instant = Instant.UNIX_EPOCH, is_complete: bool = false, parent_slot: ?Slot = null, unique_observed_count: u32 = 0, @@ -287,7 +287,7 @@ const MonitoredSlot = struct { const Self = @This(); /// returns if the slot is *definitely* complete (there may be false negatives) - pub fn record(self: *Self, shred_index: u32, is_last_in_slot: bool, milli_timestamp: i64) bool { + pub fn record(self: *Self, shred_index: u32, is_last_in_slot: bool, timestamp: Instant) bool { if (self.is_complete) return false; if (!bit_set.setAndWasSet(&self.shreds, shred_index)) self.unique_observed_count += 1; @@ -299,7 +299,7 @@ const MonitoredSlot = struct { if (self.max_seen == null) { self.max_seen = shred_index; - self.first_received_timestamp_ms = milli_timestamp; + self.first_received_timestamp = timestamp; } else { self.max_seen = @max(self.max_seen.?, shred_index); } @@ -348,7 +348,7 @@ test "trivial happy path" { var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry()); - _ = try tracker.identifyMissing(&msr, 1_000); + _ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH.plus(Duration.fromSecs(1))); try std.testing.expect(1 == msr.len); const report = msr.items()[0]; @@ -365,12 +365,12 @@ test "1 registered shred is identified" { defer msr.deinit(); var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry()); - try tracker.registerShred(13579, 123, 13578, false, 0); + try tracker.registerShred(13579, 123, 13578, false, Instant.UNIX_EPOCH); - _ = try tracker.identifyMissing(&msr, 0); + _ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH); try std.testing.expectEqual(0, msr.len); - _ = try tracker.identifyMissing(&msr, 1_000); + _ = try tracker.identifyMissing(&msr, Instant.UNIX_EPOCH.plus(Duration.fromSecs(1))); try std.testing.expectEqual(1, msr.len); const report = msr.items()[0]; diff --git a/src/time/time.zig b/src/time/time.zig index 4594ee266..358684a3f 100644 --- a/src/time/time.zig +++ b/src/time/time.zig @@ -22,6 +22,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. const std = @import("std"); +const builtin = @import("builtin"); const string = []const u8; const time = @This(); @@ -563,6 +564,15 @@ pub const Duration = struct { pub const Instant = struct { inner: std.time.Instant, + pub const UNIX_EPOCH = Instant{ .inner = .{ + .timestamp = if (is_posix) .{ .tv_sec = 0, .tv_nsec = 0 } else 0, + } }; + + const is_posix = switch (builtin.os.tag) { + .windows, .uefi, .wasi => false, + else => true, + }; + pub fn now() Instant { return .{ .inner = std.time.Instant.now() catch unreachable }; } @@ -575,6 +585,18 @@ pub const Instant = struct { return Duration.fromNanos(self.inner.since(earlier.inner)); } + pub fn plus(self: Instant, duration: Duration) Instant { + if (is_posix) { + const new_ns = self.inner.timestamp.tv_nsec + @as(isize, @intCast(duration.ns)); + return .{ .inner = .{ .timestamp = .{ + .tv_sec = self.inner.timestamp.tv_sec + @divFloor(new_ns, std.time.ns_per_s), + .tv_nsec = @mod(new_ns, std.time.ns_per_s), + } } }; + } else { + return .{ .inner = .{ .timestamp = self.inner.timestamp + duration.ns } }; + } + } + pub fn format(self: @This(), comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { return try writer.print("{s}", .{std.fmt.fmtDuration(switch (@TypeOf(self.inner.timestamp)) { u64 => self.inner.timestamp,