diff --git a/build.zig b/build.zig index 4ab4680c2..c6467952f 100644 --- a/build.zig +++ b/build.zig @@ -141,6 +141,13 @@ pub fn build(b: *std.Build) void { }); b.modules.put(b.dupe("state_transition"), module_state_transition) catch @panic("OOM"); + const module_clock = b.createModule(.{ + .root_source_file = b.path("src/clock/root.zig"), + .target = target, + .optimize = optimize, + }); + b.modules.put(b.dupe("clock"), module_clock) catch @panic("OOM"); + // === Executables === const module_download_era_files = b.createModule(.{ .root_source_file = b.path("scripts/download_era_files.zig"), @@ -378,6 +385,17 @@ pub fn build(b: *std.Build) void { const tls_run_test_era = b.step("test:era", "Run the era test"); tls_run_test_era.dependOn(&run_test_era.step); tls_run_test.dependOn(&run_test_era.step); + + const test_clock = b.addTest(.{ + .name = "clock", + .root_module = module_clock, + .filters = b.option([][]const u8, "clock.filters", "clock test filters") orelse &[_][]const u8{}, + }); + const run_test_clock = b.addRunArtifact(test_clock); + const tls_run_test_clock = b.step("test:clock", "Run the clock test"); + tls_run_test_clock.dependOn(&run_test_clock.step); + tls_run_test.dependOn(&run_test_clock.step); + // Spec test modules const module_int = b.createModule(.{ .root_source_file = b.path("test/int/era/root.zig"), diff --git a/src/clock/EventClock.zig b/src/clock/EventClock.zig new file mode 100644 index 000000000..713f61e9d --- /dev/null +++ b/src/clock/EventClock.zig @@ -0,0 +1,887 @@ +//! Layer 2 – Event-driven beacon clock. +//! +//! Combines `SlotClock` with an async I/O loop to emit slot/epoch events +//! and dispatch waiters. All public methods are safe to call from the +//! main thread; the internal loop runs as a single cooperative fiber. + +const std = @import("std"); +const Allocator = std.mem.Allocator; +const slot_math = @import("slot_math.zig"); +const SlotClock = @import("SlotClock.zig"); +const time_source = @import("time_source.zig"); + +const EventClock = @This(); + +pub const Slot = slot_math.Slot; +pub const Epoch = slot_math.Epoch; +pub const Config = slot_math.Config; +pub const ListenerId = u64; +pub const TimeSource = time_source.TimeSource; + +pub const Error = error{ + InvalidConfig, + OutOfMemory, + ListenerLimitReached, + Aborted, +}; + +const WaitState = struct { + io: std.Io, + allocator: Allocator, + event: std.Io.Event = .unset, + aborted: bool = false, +}; + +const WaiterEntry = struct { + target: Slot, + state: *WaitState, +}; + +const SlotListenerEntry = struct { + id: ListenerId, + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +}; + +const EpochListenerEntry = struct { + id: ListenerId, + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +}; + +const SlotSnapshot = struct { + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +}; + +const EpochSnapshot = struct { + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +}; + +const WaiterQueue = std.PriorityQueue(WaiterEntry, void, struct { + fn compare(_: void, a: WaiterEntry, b: WaiterEntry) std.math.Order { + return std.math.order(a.target, b.target); + } +}.compare); + +allocator: Allocator, +io: std.Io, +clock: SlotClock, + +stopped: bool = false, +loop_future: ?std.Io.Future(void) = null, + +next_listener_id: ListenerId = 1, +slot_listeners: std.ArrayListUnmanaged(SlotListenerEntry) = .empty, +epoch_listeners: std.ArrayListUnmanaged(EpochListenerEntry) = .empty, +slot_snapshot: std.ArrayListUnmanaged(SlotSnapshot) = .empty, +epoch_snapshot: std.ArrayListUnmanaged(EpochSnapshot) = .empty, + +waiters: WaiterQueue, + +/// Initialise in-place. +pub fn init(self: *EventClock, allocator: Allocator, config: Config, io_handle: std.Io) Error!void { + self.* = .{ + .allocator = allocator, + .io = io_handle, + .clock = undefined, + .waiters = WaiterQueue.initContext({}), + }; + self.clock = SlotClock.init(config, .{ .real = .{ .io = io_handle } }) catch return error.InvalidConfig; +} + +/// Start the auto-advance loop. Idempotent; second call is a no-op. +pub fn start(self: *EventClock) void { + if (self.loop_future != null) return; + self.loop_future = std.Io.async(self.io, EventClock.runAutoLoop, .{self}); +} + +/// Signal the loop to stop and abort all pending waiters. Idempotent. +pub fn stop(self: *EventClock) void { + if (self.stopped) return; + self.stopped = true; + self.abortAllWaiters(); +} + +/// Signal the loop to stop, cancel the fiber, and wait for it to finish. +pub fn join(self: *EventClock) void { + self.stop(); + var maybe_future = self.loop_future; + self.loop_future = null; + if (maybe_future) |*future| { + future.cancel(self.io); + future.await(self.io); + } +} + +/// Release all resources. Calls `stop()` + `join()` internally. +pub fn deinit(self: *EventClock) void { + self.stop(); + self.join(); + self.slot_snapshot.deinit(self.allocator); + self.epoch_snapshot.deinit(self.allocator); + self.slot_listeners.deinit(self.allocator); + self.epoch_listeners.deinit(self.allocator); + self.waiters.deinit(self.allocator); + self.* = undefined; +} + +// ── Listener API ── +// NOTE: Listeners should be registered before calling `start()`. +// Adding listeners from within a callback may silently skip the new listener +// until the next slot, because snapshot buffers are pre-allocated at registration +// time and emitSlot/emitEpoch only use pre-allocated capacity. + +/// Register a slot listener. Returns an ID for later removal via `offSlot`. +pub fn onSlot( + self: *EventClock, + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +) Error!ListenerId { + if (self.next_listener_id == std.math.maxInt(ListenerId)) + return error.ListenerLimitReached; + // Pre-allocate snapshot buffer BEFORE appending the listener, so that + // if OOM occurs we haven't modified any state yet. + self.slot_snapshot.ensureTotalCapacity( + self.allocator, + self.slot_listeners.items.len + 1, + ) catch return error.OutOfMemory; + self.slot_listeners.append(self.allocator, .{ + .id = self.next_listener_id, + .callback = callback, + .ctx = ctx, + }) catch return error.OutOfMemory; + const id = self.next_listener_id; + self.next_listener_id += 1; + return id; +} + +/// Unregister a slot listener. Returns `true` if found and removed. +pub fn offSlot(self: *EventClock, id: ListenerId) bool { + for (self.slot_listeners.items, 0..) |listener, i| { + if (listener.id == id) { + _ = self.slot_listeners.orderedRemove(i); + return true; + } + } + return false; +} + +/// Register an epoch listener. Returns an ID for later removal via `offEpoch`. +pub fn onEpoch( + self: *EventClock, + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +) Error!ListenerId { + if (self.next_listener_id == std.math.maxInt(ListenerId)) + return error.ListenerLimitReached; + // Pre-allocate snapshot buffer BEFORE appending the listener, so that + // if OOM occurs we haven't modified any state yet. + self.epoch_snapshot.ensureTotalCapacity( + self.allocator, + self.epoch_listeners.items.len + 1, + ) catch return error.OutOfMemory; + self.epoch_listeners.append(self.allocator, .{ + .id = self.next_listener_id, + .callback = callback, + .ctx = ctx, + }) catch return error.OutOfMemory; + const id = self.next_listener_id; + self.next_listener_id += 1; + return id; +} + +/// Unregister an epoch listener. Returns `true` if found and removed. +pub fn offEpoch(self: *EventClock, id: ListenerId) bool { + for (self.epoch_listeners.items, 0..) |listener, i| { + if (listener.id == id) { + _ = self.epoch_listeners.orderedRemove(i); + return true; + } + } + return false; +} + +// ── Delegated read APIs ── +// Every public accessor that exposes "current" slot/epoch state calls catchUp() +// first, matching the TS version where `get currentSlot()` triggers event +// emission before returning. Pure time-arithmetic helpers (slotWithFutureTolerance, +// secFromSlot, etc.) do NOT catch up, matching TS which doesn't go through +// `this.currentSlot` for those. + +pub fn currentSlot(self: *EventClock) ?Slot { + self.catchUp(); + return self.clock.currentSlot(); +} + +pub fn currentEpoch(self: *EventClock) ?Epoch { + self.catchUp(); + return self.clock.currentEpoch(); +} + +pub fn currentSlotOrGenesis(self: *EventClock) Slot { + self.catchUp(); + return self.clock.currentSlotOrGenesis(); +} + +pub fn currentEpochOrGenesis(self: *EventClock) Epoch { + self.catchUp(); + return self.clock.currentEpochOrGenesis(); +} + +pub fn currentSlotWithGossipDisparity(self: *EventClock) Slot { + self.catchUp(); + return self.clock.currentSlotWithGossipDisparity(); +} + +pub fn isCurrentSlotGivenGossipDisparity(self: *EventClock, slot: Slot) bool { + self.catchUp(); + return self.clock.isCurrentSlotGivenGossipDisparity(slot); +} + +pub fn slotWithFutureTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { + return self.clock.slotWithFutureTolerance(tolerance_ms); +} + +pub fn slotWithPastTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { + return self.clock.slotWithPastTolerance(tolerance_ms); +} + +pub fn secFromSlot(self: *EventClock, slot: Slot, to_sec: ?slot_math.UnixSec) ?i64 { + return self.clock.secFromSlot(slot, to_sec); +} + +pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 { + return self.clock.msFromSlot(slot, to_ms); +} + +// ── waitForSlot ── + +/// Return type from `waitForSlot`. The caller MUST either: +/// - call `await()` to wait for the target slot and release resources, OR +/// - call `cancel()` to abort and release resources, OR +/// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`. +/// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks +/// the internal WaitState. +/// +/// Idiomatic usage with `errdefer`: +/// var fut = try ec.waitForSlot(target); +/// errdefer fut.cancel(); +/// try fut.await(io); +pub const WaitForSlotResult = struct { + inner: std.Io.Future(Error!void), + state: ?*WaitState, + clock: ?*EventClock, + + /// Create an immediately-resolved result (no async work needed). + /// Relies on `std.Io.Future.await` returning `.result` when `.any_future == null`. + fn immediate(result: Error!void) WaitForSlotResult { + return .{ .inner = .{ .any_future = null, .result = result }, .state = null, .clock = null }; + } + + pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void { + const result = self.inner.await(io); + // Free AFTER await returns — workaround for Zig futex use-after-free + // where GCD still holds a reference to the event address after wake. + if (self.state) |s| s.allocator.destroy(s); + self.state = null; + self.clock = null; + return result; + } + + /// Abort a pending wait and release its resources. Idempotent — safe + /// to call on an already-awaited, already-cancelled, or immediate result. + /// + /// Typical usage: + /// var fut = try ec.waitForSlot(target); + /// errdefer fut.cancel(); + /// try fut.await(io); + pub fn cancel(self: *WaitForSlotResult) void { + const state = self.state orelse return; + // Remove from waiter queue before freeing, so abortAllWaiters + // won't dereference the freed state pointer. + if (self.clock) |clock| { + for (clock.waiters.items, 0..) |entry, i| { + if (entry.state == state) { + _ = clock.waiters.popIndex(i); + break; + } + } + } + state.aborted = true; + state.event.set(state.io); + // Must await the fiber so it finishes before we free its state. + // The fiber returns error.Aborted (expected) or {} (already dispatched). + _ = self.inner.await(state.io) catch |err| { + std.debug.assert(err == error.Aborted); + }; + state.allocator.destroy(state); + self.state = null; + self.clock = null; + } +}; + +/// Return a future that resolves when the clock reaches `target`. +/// See `WaitForSlotResult` for the caller's obligations. +pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult { + if (self.stopped) { + return WaitForSlotResult.immediate(error.Aborted); + } + // Catch up events then check fast-path against advanced state. + self.catchUp(); + if (self.clock.current_slot) |slot| { + if (slot >= target) { + return WaitForSlotResult.immediate({}); + } + } + + const state = self.allocator.create(WaitState) catch return error.OutOfMemory; + errdefer self.allocator.destroy(state); + + state.* = .{ + .io = self.io, + .allocator = self.allocator, + }; + + if (self.stopped) { + self.allocator.destroy(state); + return WaitForSlotResult.immediate(error.Aborted); + } + self.waiters.push(self.allocator, .{ .target = target, .state = state }) catch return error.OutOfMemory; + self.dispatchWaiters(self.clock.current_slot); + + return .{ + .inner = std.Io.async(self.io, waitForSlotFutureAwait, .{state}), + .state = state, + .clock = self, + }; +} + +// ── Private ── + +/// Ensure event-clock state is caught up to wall-clock time. +/// Emits any intermediate slot/epoch events to listeners. +/// No-op if already caught up or pre-genesis (currentSlot() returns null). +/// Safe to call from any fiber — cooperative scheduling guarantees no +/// concurrent access (same model as TS's single-threaded event loop). +fn catchUp(self: *EventClock) void { + if (self.clock.currentSlot()) |wall_slot| { + self.advanceAndDispatch(wall_slot); + } +} + +fn emitSlot(self: *EventClock, slot: Slot) void { + self.slot_snapshot.clearRetainingCapacity(); + // Use only pre-allocated capacity — no allocation in fiber context. + const limit = @min(self.slot_listeners.items.len, self.slot_snapshot.capacity); + for (self.slot_listeners.items[0..limit]) |listener| { + self.slot_snapshot.appendAssumeCapacity(.{ + .callback = listener.callback, + .ctx = listener.ctx, + }); + } + + for (self.slot_snapshot.items) |listener| { + listener.callback(listener.ctx, slot); + } +} + +fn emitEpoch(self: *EventClock, epoch: Epoch) void { + self.epoch_snapshot.clearRetainingCapacity(); + // Use only pre-allocated capacity — no allocation in fiber context. + const limit = @min(self.epoch_listeners.items.len, self.epoch_snapshot.capacity); + for (self.epoch_listeners.items[0..limit]) |listener| { + self.epoch_snapshot.appendAssumeCapacity(.{ + .callback = listener.callback, + .ctx = listener.ctx, + }); + } + + for (self.epoch_snapshot.items) |listener| { + listener.callback(listener.ctx, epoch); + } +} + +fn dispatchWaiters(self: *EventClock, current_slot: ?Slot) void { + const slot = current_slot orelse return; + while (self.waiters.peek()) |head| { + if (head.target > slot) break; + const waiter = self.waiters.pop().?; + waiter.state.aborted = false; + waiter.state.event.set(waiter.state.io); + } +} + +fn abortAllWaiters(self: *EventClock) void { + while (self.waiters.pop()) |waiter| { + waiter.state.aborted = true; + waiter.state.event.set(waiter.state.io); + } +} + +fn advanceAndDispatch(self: *EventClock, target: Slot) void { + var iter = self.clock.advanceTo(target); + while (iter.next()) |event| { + if (self.stopped) break; + switch (event) { + .slot => |s| { + self.emitSlot(s); + self.dispatchWaiters(s); + }, + .epoch => |e| self.emitEpoch(e), + } + } + // Defensive: handles edge cases where advanceTo yields zero events + // (already at target) but waiters were added between loop ticks. + // In the normal case, this is a no-op because the last .slot event + // already dispatched waiters at the same slot value. + self.dispatchWaiters(self.clock.current_slot); +} + +fn runAutoLoop(self: *EventClock) void { + while (!self.stopped) { + const now_ms = self.clock.time.nowMs(); + // Config validation guarantees sec→ms won't overflow, so null here + // indicates a logic bug. Break instead of spinning at 1ms. + const next_ms = slot_math.msUntilNextSlot(self.clock.config, now_ms) orelse { + std.log.err("EventClock: msUntilNextSlot returned null (config overflow?), stopping loop", .{}); + self.stop(); + break; + }; + const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64); + + // Sleep failure: cancellation (from join()) exits the loop; + // other errors re-check the stopped flag. + std.Io.sleep( + self.io, + std.Io.Duration.fromMilliseconds(sleep_ms), + .awake, + ) catch |err| { + if (err == error.Canceled) break; + std.log.debug("EventClock: sleep failed ({s}), retrying", .{@errorName(err)}); + continue; + }; + + if (self.stopped) break; + // Only advance after genesis. Before genesis currentSlot() returns + // null — skipping here prevents emitting slot 0 prematurely. + if (self.clock.currentSlot()) |slot| { + self.advanceAndDispatch(slot); + } + } +} + +fn waitForSlotFutureAwait(state: *WaitState) Error!void { + // NOTE: Do NOT free state here. The caller (WaitForSlotResult.await) frees + // it AFTER this future completes — workaround for Zig futex use-after-free + // where GCD still holds a reference to the event address after wake. + state.event.waitUncancelable(state.io); + if (state.aborted) return error.Aborted; +} + +// ── Tests ── + +const testing = std.testing; + +const TestIo = struct { + evented: std.Io.Evented = undefined, + + fn init(self: *TestIo) !void { + self.* = .{ .evented = undefined }; + try self.evented.init(std.heap.page_allocator, .{}); + } + + fn deinit(self: *TestIo) void { + self.evented.deinit(); + } + + fn io(self: *TestIo) std.Io { + return self.evented.io(); + } +}; + +fn nowSecAt(io_handle: std.Io) u64 { + const sec = std.Io.Clock.real.now(io_handle).toSeconds(); + std.debug.assert(sec >= 0); + return @intCast(sec); +} + +fn nowMsAt(io_handle: std.Io) u64 { + const ms = std.Io.Clock.real.now(io_handle).toMilliseconds(); + std.debug.assert(ms >= 0); + return @intCast(ms); +} + +const EventTraceState = struct { + slots: [64]Slot = undefined, + slot_len: usize = 0, + epochs: [64]u64 = undefined, + epoch_len: usize = 0, + + fn onSlot(ctx: ?*anyopaque, slot: Slot) void { + const self: *EventTraceState = @ptrCast(@alignCast(ctx.?)); + if (self.slot_len >= self.slots.len) return; + self.slots[self.slot_len] = slot; + self.slot_len += 1; + } + + fn onEpoch(ctx: ?*anyopaque, epoch: u64) void { + const self: *EventTraceState = @ptrCast(@alignCast(ctx.?)); + if (self.epoch_len >= self.epochs.len) return; + self.epochs[self.epoch_len] = epoch; + self.epoch_len += 1; + } +}; + +test "lifecycle: init -> register -> start -> receive events -> stop" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); + try fut.await(io_handle); + + try testing.expect(trace.slot_len > 0); +} + +test "waitForSlot resolves immediately when at target" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + const current = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(current); + errdefer fut.cancel(); + try fut.await(io_handle); +} + +test "waitForSlot returns aborted on stop" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .seconds_per_slot = 2, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var fut = try clock.waitForSlot(100); + errdefer fut.cancel(); + clock.stop(); + try testing.expectError(error.Aborted, fut.await(io_handle)); +} + +test "offSlot/offEpoch stop event delivery" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .seconds_per_slot = 1, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + const slot_id = try clock.onSlot(EventTraceState.onSlot, &trace); + const epoch_id = try clock.onEpoch(EventTraceState.onEpoch, &trace); + try testing.expect(clock.offSlot(slot_id)); + try testing.expect(clock.offEpoch(epoch_id)); + + clock.advanceAndDispatch(6); + try testing.expectEqual(@as(usize, 0), trace.slot_len); + try testing.expectEqual(@as(usize, 0), trace.epoch_len); +} + +test "stop/join are idempotent" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .seconds_per_slot = 2, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + clock.stop(); + clock.stop(); + clock.join(); + clock.join(); +} + +test "epoch event is delivered when crossing epoch boundary" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .seconds_per_slot = 1, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + _ = try clock.onEpoch(EventTraceState.onEpoch, &trace); + + // Advance from null through epoch boundary at slot 4 + clock.advanceAndDispatch(5); + + try testing.expect(trace.slot_len > 0); + try testing.expect(trace.epoch_len > 0); + try testing.expectEqual(@as(u64, 1), trace.epochs[0]); +} + +test "multiple waiters are dispatched in target-slot order" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 10, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + // Register waiters for slots 5, 3, 1 (out of order) + var fut5 = try clock.waitForSlot(5); + errdefer fut5.cancel(); + var fut3 = try clock.waitForSlot(3); + errdefer fut3.cancel(); + var fut1 = try clock.waitForSlot(1); + errdefer fut1.cancel(); + + // Advance to slot 3 — should dispatch slot 1 and slot 3, NOT slot 5 + clock.advanceAndDispatch(3); + + try fut1.await(io_handle); + try fut3.await(io_handle); + + // fut5 should still be pending. Stop to abort it. + clock.stop(); + try testing.expectError(error.Aborted, fut5.await(io_handle)); +} + +test "cancel releases WaitState without awaiting" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 10, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + // Create a waiter for a far-future slot and immediately cancel it. + // testing.allocator will detect a leak if cancel fails to free. + var fut = try clock.waitForSlot(999); + fut.cancel(); +} + +// ── Real-time tests ── +// These tests exercise `runAutoLoop` (the production code path) by calling +// `clock.start()` and letting wall-clock time drive slot advancement. + +test "real-time: no slot events emitted before genesis" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 5, // genesis 5s in the future + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + clock.start(); + + // Sleep 1.5s — the loop will tick several times, all pre-genesis. + std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(1500), .awake) catch {}; + + // No slot events should have been emitted before genesis. + try testing.expectEqual(@as(usize, 0), trace.slot_len); +} + +test "real-time: slot events fire with correct timing" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + const before_ms = nowMsAt(io_handle); + var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); + try fut.await(io_handle); + const elapsed = nowMsAt(io_handle) - before_ms; + + // Should wait roughly 0-1s for the next slot boundary. + // Generous upper bound avoids flaky CI. + try testing.expect(elapsed < 2000); + try testing.expect(trace.slot_len > 0); + // The delivered slot number must match or exceed our target. + try testing.expect(trace.slots[trace.slot_len - 1] >= start_slot + 1); +} + +test "real-time: multi-slot advancement delivers ordered events" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .seconds_per_slot = 1, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(start_slot + 2); + errdefer fut.cancel(); + try fut.await(io_handle); + + // At least 2 slot events should have been emitted. + try testing.expect(trace.slot_len >= 2); + // Slots must be in strictly ascending order. + for (1..trace.slot_len) |i| { + try testing.expect(trace.slots[i] > trace.slots[i - 1]); + } +} + +test "real-time: stop+join cancels promptly" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 100, // far future + .seconds_per_slot = 12, // long slot like mainnet + .slots_per_epoch = 32, + }, io_handle); + defer clock.deinit(); + + clock.start(); + + // Give the loop fiber time to enter its sleep. + std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {}; + + const before_ms = nowMsAt(io_handle); + clock.stop(); + clock.join(); + const elapsed = nowMsAt(io_handle) - before_ms; + + // join() cancels the sleeping future directly, so it should return + // almost immediately — NOT after the full 12-second slot duration. + try testing.expect(elapsed < 1500); +} + +test "real-time: epoch boundary event fires" { + var rt: TestIo = undefined; + try rt.init(); + defer rt.deinit(); + const io_handle = rt.io(); + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .seconds_per_slot = 1, + .slots_per_epoch = 2, // epoch boundary every 2 slots + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + _ = try clock.onEpoch(EventTraceState.onEpoch, &trace); + + clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + // Wait enough slots to guarantee crossing at least one epoch boundary. + var fut = try clock.waitForSlot(start_slot + 3); + errdefer fut.cancel(); + try fut.await(io_handle); + + try testing.expect(trace.slot_len >= 3); + // Must have seen at least one epoch transition. + try testing.expect(trace.epoch_len > 0); +} diff --git a/src/clock/SlotClock.zig b/src/clock/SlotClock.zig new file mode 100644 index 000000000..4f1eabdd2 --- /dev/null +++ b/src/clock/SlotClock.zig @@ -0,0 +1,330 @@ +//! Layer 1 – Stateful slot clock. +//! +//! Wraps `slot_math` with a `TimeSource` and a cached `current_slot`. +//! Pure-read helpers query wall-clock time; only `advanceTo()` mutates the cache. + +const std = @import("std"); +const slot_math = @import("slot_math.zig"); +const time_source = @import("time_source.zig"); + +const SlotClock = @This(); + +pub const Slot = slot_math.Slot; +pub const Epoch = slot_math.Epoch; +pub const Config = slot_math.Config; +pub const TimeSource = time_source.TimeSource; + +pub const Event = union(enum) { + slot: Slot, + epoch: Epoch, +}; + +pub const AdvanceIterator = struct { + clock: *SlotClock, + target: Slot, + pending_epoch: ?Epoch = null, + + /// Advances the clock one step at a time, yielding slot and epoch events. + /// For each slot advancement: yields .slot first, then .epoch if an epoch boundary was crossed. + /// Returns null when caught up to target. + pub fn next(self: *AdvanceIterator) ?Event { + // If we have a pending epoch event from the previous step, emit it now + if (self.pending_epoch) |epoch| { + self.pending_epoch = null; + return .{ .epoch = epoch }; + } + + const current = self.clock.current_slot; + + // Genesis case: current_slot is null, advance to slot 0 + if (current == null) { + self.clock.current_slot = 0; + return .{ .slot = 0 }; + } + + const cur = current.?; + if (cur >= self.target) return null; + if (cur == std.math.maxInt(Slot)) return null; + + const next_slot = cur + 1; + self.clock.current_slot = next_slot; + + // Check epoch boundary — epochAtSlot returns ?Epoch + const prev_epoch = slot_math.epochAtSlot(self.clock.config, cur); + const new_epoch = slot_math.epochAtSlot(self.clock.config, next_slot); + if (prev_epoch) |prev_ep| { + if (new_epoch) |new_ep| { + if (prev_ep < new_ep) { + self.pending_epoch = new_ep; + } + } + } + + return .{ .slot = next_slot }; + } +}; + +config: Config, +time: TimeSource, +current_slot: ?Slot = null, + +pub fn init(config: Config, time: TimeSource) error{InvalidConfig}!SlotClock { + try config.validate(); + var self = SlotClock{ + .config = config, + .time = time, + }; + self.current_slot = slot_math.slotAtMs(config, time.nowMs()); + return self; +} + +/// Returns the current wall-clock slot. Pure read — does NOT update +/// the internal `current_slot` cache. Only `advanceTo()` advances the cache. +pub fn currentSlot(self: *const SlotClock) ?Slot { + const now_ms = self.time.nowMs(); + return slot_math.slotAtMs(self.config, now_ms); +} + +pub fn currentEpoch(self: *const SlotClock) ?Epoch { + const slot = self.currentSlot() orelse return null; + return slot_math.epochAtSlot(self.config, slot); +} + +pub fn currentSlotOrGenesis(self: *const SlotClock) Slot { + return self.currentSlot() orelse 0; +} + +pub fn currentEpochOrGenesis(self: *const SlotClock) Epoch { + return self.currentEpoch() orelse 0; +} + +pub fn currentSlotWithGossipDisparity(self: *const SlotClock) Slot { + const current = self.currentSlotOrGenesis(); + if (current == std.math.maxInt(Slot)) return current; + const now_ms = self.time.nowMs(); + const next_slot = current + 1; + const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return current; + return if (next_slot_ms -| now_ms < self.config.maximum_gossip_clock_disparity_ms) next_slot else current; +} + +pub fn isCurrentSlotGivenGossipDisparity(self: *const SlotClock, slot: Slot) bool { + const current = self.currentSlotOrGenesis(); + if (slot == current) return true; + + const now_ms = self.time.nowMs(); + + // Check if close to next slot + if (current != std.math.maxInt(Slot)) { + const next_slot = current + 1; + const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return false; + if (next_slot_ms -| now_ms < self.config.maximum_gossip_clock_disparity_ms) { + return slot == next_slot; + } + } + + // Check if just passed current slot boundary + if (current > 0) { + const current_slot_ms = slot_math.slotStartMs(self.config, current) orelse return false; + if (now_ms -| current_slot_ms < self.config.maximum_gossip_clock_disparity_ms) { + return slot == current - 1; + } + } + + return false; +} + +pub fn slotWithFutureTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { + const now_ms = self.time.nowMs(); + const shifted = @addWithOverflow(now_ms, tolerance_ms); + if (shifted[1] != 0) return null; + return slot_math.slotAtMs(self.config, shifted[0]); +} + +pub fn slotWithPastTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { + const now_ms = self.time.nowMs(); + // Checked sub: underflow (pre-UNIX-epoch) returns null. + // Pre-genesis but valid timestamp returns 0. + const shifted_ms = std.math.sub(u64, now_ms, tolerance_ms) catch return null; + return slot_math.slotAtMs(self.config, shifted_ms) orelse 0; +} + +pub fn secFromSlot(self: *const SlotClock, slot: Slot, to_sec: ?slot_math.UnixSec) ?i64 { + const from_sec = slot_math.slotStartSec(self.config, slot) orelse return null; + const end_sec = to_sec orelse @divFloor(self.time.nowMs(), 1000); + const diff = @as(i128, @intCast(end_sec)) - @as(i128, @intCast(from_sec)); + if (diff < std.math.minInt(i64) or diff > std.math.maxInt(i64)) return null; + return @intCast(diff); +} + +pub fn msFromSlot(self: *const SlotClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 { + const from_ms = slot_math.slotStartMs(self.config, slot) orelse return null; + const end_ms = to_ms orelse self.time.nowMs(); + const diff = @as(i128, @intCast(end_ms)) - @as(i128, @intCast(from_ms)); + if (diff < std.math.minInt(i64) or diff > std.math.maxInt(i64)) return null; + return @intCast(diff); +} + +pub fn advanceTo(self: *SlotClock, target: Slot) AdvanceIterator { + return .{ + .clock = self, + .target = target, + }; +} + +const testing = std.testing; + +const test_cfg = Config{ + .genesis_time_sec = 100, + .seconds_per_slot = 12, + .slots_per_epoch = 32, + .maximum_gossip_clock_disparity_ms = 500, +}; + +test "pre-genesis returns null, genesis fallback returns zero" { + var fake = time_source.FakeTime{ .ms = 99_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, null), clock.currentSlot()); + try testing.expectEqual(@as(?Epoch, null), clock.currentEpoch()); + try testing.expectEqual(@as(Slot, 0), clock.currentSlotOrGenesis()); + try testing.expectEqual(@as(Epoch, 0), clock.currentEpochOrGenesis()); +} + +test "currentSlot at genesis and advancing" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, 0), clock.currentSlot()); + + fake.setMs(112_000); + try testing.expectEqual(@as(?Slot, 1), clock.currentSlot()); + + fake.setMs(124_000); + try testing.expectEqual(@as(?Slot, 2), clock.currentSlot()); +} + +test "currentEpoch" { + var fake = time_source.FakeTime{ .ms = 100_000 + 32 * 12 * 1000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Epoch, 1), clock.currentEpoch()); +} + +test "advanceTo produces correct slot events" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(3); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 1); + try testing.expect(events[1] == .slot and events[1].slot == 2); + try testing.expect(events[2] == .slot and events[2].slot == 3); + try testing.expectEqual(@as(?Slot, 3), clock.current_slot); +} + +test "advanceTo across epoch boundary emits slot then epoch" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + clock.current_slot = 31; + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(33); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 32); + try testing.expect(events[1] == .epoch and events[1].epoch == 1); + try testing.expect(events[2] == .slot and events[2].slot == 33); +} + +test "advanceTo from null (pre-genesis)" { + var fake = time_source.FakeTime{ .ms = 99_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, null), clock.current_slot); + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(2); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 0); + try testing.expect(events[1] == .slot and events[1].slot == 1); + try testing.expect(events[2] == .slot and events[2].slot == 2); +} + +test "advanceTo already at target returns nothing" { + var fake = time_source.FakeTime{ .ms = 112_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + + var count: usize = 0; + var iter = clock.advanceTo(1); + while (iter.next()) |_| count += 1; + try testing.expectEqual(@as(usize, 0), count); +} + +test "gossip disparity: far from boundary" { + var fake = time_source.FakeTime{ .ms = 103_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 0), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(0)); + try testing.expect(!clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "gossip disparity: near next slot boundary" { + var fake = time_source.FakeTime{ .ms = 111_600 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 1), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "gossip disparity: just after slot boundary" { + var fake = time_source.FakeTime{ .ms = 112_300 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(0)); +} + +test "gossip disparity: exact threshold (500ms) does NOT apply" { + // next_slot_ms - now_ms == 500 → NOT < 500, so disparity doesn't apply + // Slot 1 starts at 112_000ms. 500ms before = 111_500ms. + var fake = time_source.FakeTime{ .ms = 111_500 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + // At exactly the threshold, disparity should NOT bump to next slot + try testing.expectEqual(@as(Slot, 0), clock.currentSlotWithGossipDisparity()); + try testing.expect(!clock.isCurrentSlotGivenGossipDisparity(1)); + + // 1ms closer (111_501): 112_000 - 111_501 = 499 < 500, disparity applies + fake.setMs(111_501); + clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 1), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "tolerance helpers" { + var fake = time_source.FakeTime{ .ms = 112_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, 2), clock.slotWithFutureTolerance(12_000)); + try testing.expectEqual(@as(?Slot, 0), clock.slotWithPastTolerance(12_000)); + try testing.expectEqual(@as(?Slot, null), clock.slotWithFutureTolerance(std.math.maxInt(u64))); + // Underflow (tolerance > now_ms) returns null, not 0 + try testing.expectEqual(@as(?Slot, null), clock.slotWithPastTolerance(112_001)); +} + +test "secFromSlot and msFromSlot" { + var fake = time_source.FakeTime{ .ms = 118_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?i64, 6), clock.secFromSlot(1, null)); + try testing.expectEqual(@as(?i64, 6000), clock.msFromSlot(1, null)); + try testing.expectEqual(@as(?i64, 0), clock.secFromSlot(1, 112)); +} diff --git a/src/clock/root.zig b/src/clock/root.zig new file mode 100644 index 000000000..f2ebe84a5 --- /dev/null +++ b/src/clock/root.zig @@ -0,0 +1,24 @@ +//! Zig beacon clock – slot/epoch timing for Ethereum consensus. +//! +//! Three-layer architecture: +//! Layer 0 (`slot_math`) – pure arithmetic, comptime-compatible +//! Layer 1 (`SlotClock`) – stateful clock with time source +//! Layer 2 (`EventClock`) – async event loop with listeners and waiters + +pub const slot_math = @import("slot_math.zig"); +pub const time_source = @import("time_source.zig"); +pub const SlotClock = @import("SlotClock.zig"); +pub const EventClock = @import("EventClock.zig"); + +pub const Config = SlotClock.Config; +pub const Slot = SlotClock.Slot; +pub const Epoch = SlotClock.Epoch; + +pub const ListenerId = EventClock.ListenerId; +pub const Error = EventClock.Error; + +test { + _ = slot_math; + _ = SlotClock; + _ = EventClock; +} diff --git a/src/clock/slot_math.zig b/src/clock/slot_math.zig new file mode 100644 index 000000000..481b438f3 --- /dev/null +++ b/src/clock/slot_math.zig @@ -0,0 +1,254 @@ +//! Layer 0 – Pure slot/epoch arithmetic. +//! +//! No state, no allocation, no I/O. Every function is comptime-compatible. +//! All overflow paths return `null` (`?T`) instead of panicking. + +const std = @import("std"); + +// ── Type aliases ────────────────────────────────────────────────────── + +pub const Slot = u64; +pub const Epoch = u64; +pub const UnixMs = u64; +pub const UnixSec = u64; + +// ── Config ──────────────────────────────────────────────────────────── + +pub const Config = struct { + genesis_time_sec: UnixSec, + seconds_per_slot: u64, + slots_per_epoch: u64, + maximum_gossip_clock_disparity_ms: u64 = 500, + + /// Validates that the config is usable (no zero divisors, no sec→ms overflow). + pub fn validate(self: Config) error{InvalidConfig}!void { + if (self.seconds_per_slot == 0 or self.slots_per_epoch == 0) + return error.InvalidConfig; + // Ensure sec→ms conversions used by msUntilNextSlot won't overflow at runtime. + if (secToMs(self.genesis_time_sec) == null) return error.InvalidConfig; + if (secToMs(self.seconds_per_slot) == null) return error.InvalidConfig; + } + + /// Returns the slot duration in milliseconds, or null on overflow. + pub fn slotDurationMs(self: Config) ?u64 { + return secToMs(self.seconds_per_slot); + } +}; + +/// Returns the slot at the given Unix-millisecond timestamp, +/// or null if pre-genesis or on overflow. +pub fn slotAtMs(config: Config, now_ms: UnixMs) ?Slot { + const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; + if (now_ms < genesis_ms) return null; + const slot_ms = secToMs(config.seconds_per_slot) orelse return null; + if (slot_ms == 0) return null; + return @divFloor(now_ms - genesis_ms, slot_ms); +} + +/// Returns the slot at the given Unix-second timestamp, +/// or null if pre-genesis. +pub fn slotAtSec(config: Config, now_sec: UnixSec) ?Slot { + if (now_sec < config.genesis_time_sec) return null; + if (config.seconds_per_slot == 0) return null; + return @divFloor(now_sec - config.genesis_time_sec, config.seconds_per_slot); +} + +/// Returns the epoch that contains `slot`, or null if slots_per_epoch is zero. +pub fn epochAtSlot(config: Config, slot: Slot) ?Epoch { + if (config.slots_per_epoch == 0) return null; + return @divFloor(slot, config.slots_per_epoch); +} + +/// Returns the Unix-second start time of `slot`, or null on overflow. +pub fn slotStartSec(config: Config, slot: Slot) ?UnixSec { + const offset = std.math.mul(u64, slot, config.seconds_per_slot) catch return null; + return std.math.add(u64, config.genesis_time_sec, offset) catch return null; +} + +/// Returns the Unix-millisecond start time of `slot`, or null on overflow. +pub fn slotStartMs(config: Config, slot: Slot) ?UnixMs { + const sec = slotStartSec(config, slot) orelse return null; + return secToMs(sec); +} + +/// Milliseconds until the next slot boundary. +/// Pre-genesis: returns the time until genesis. +/// Returns null only on arithmetic overflow. +pub fn msUntilNextSlot(config: Config, now_ms: UnixMs) ?u64 { + const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; + const slot_ms = secToMs(config.seconds_per_slot) orelse return null; + if (slot_ms == 0) return null; + + if (now_ms < genesis_ms) return genesis_ms - now_ms; + + const delta = now_ms - genesis_ms; + const rem = delta % slot_ms; + if (rem == 0) return slot_ms; + return slot_ms - rem; +} + +fn secToMs(sec: u64) ?u64 { + return std.math.mul(u64, sec, 1000) catch return null; +} + +const testing = std.testing; + +const mainnet = Config{ + .genesis_time_sec = 1_606_824_023, + .seconds_per_slot = 12, + .slots_per_epoch = 32, +}; + +test "basic slot math" { + // slotAtSec: genesis is slot 0 + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec)); + // slotAtSec: 12s after genesis is slot 1 + try testing.expectEqual(@as(?Slot, 1), slotAtSec(mainnet, mainnet.genesis_time_sec + 12)); + // slotAtSec: 24s after genesis is slot 2 + try testing.expectEqual(@as(?Slot, 2), slotAtSec(mainnet, mainnet.genesis_time_sec + 24)); + + // slotAtMs at genesis + const genesis_ms = mainnet.genesis_time_sec * 1000; + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 12_000)); + + // epochAtSlot + try testing.expectEqual(@as(?Epoch, 0), epochAtSlot(mainnet, 0)); + try testing.expectEqual(@as(?Epoch, 0), epochAtSlot(mainnet, 31)); + try testing.expectEqual(@as(?Epoch, 1), epochAtSlot(mainnet, 32)); + try testing.expectEqual(@as(?Epoch, 1), epochAtSlot(mainnet, 63)); + try testing.expectEqual(@as(?Epoch, 2), epochAtSlot(mainnet, 64)); + + // slotStartSec round-trip + try testing.expectEqual(@as(?UnixSec, mainnet.genesis_time_sec), slotStartSec(mainnet, 0)); + try testing.expectEqual(@as(?UnixSec, mainnet.genesis_time_sec + 12), slotStartSec(mainnet, 1)); + try testing.expectEqual(@as(?UnixSec, mainnet.genesis_time_sec + 24), slotStartSec(mainnet, 2)); + + // slotStartMs round-trip + try testing.expectEqual(@as(?UnixMs, mainnet.genesis_time_sec * 1000), slotStartMs(mainnet, 0)); + try testing.expectEqual(@as(?UnixMs, (mainnet.genesis_time_sec + 12) * 1000), slotStartMs(mainnet, 1)); + + // slotDurationMs + try testing.expectEqual(@as(?u64, 12_000), mainnet.slotDurationMs()); +} + +test "within-slot timing" { + // Seconds: mid-slot reads still return the current slot + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 0)); + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 6)); + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 11)); + // The boundary itself is the next slot + try testing.expectEqual(@as(?Slot, 1), slotAtSec(mainnet, mainnet.genesis_time_sec + 12)); + + // Milliseconds: mid-slot reads still return the current slot + const genesis_ms = mainnet.genesis_time_sec * 1000; + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 1)); + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 6_000)); + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 11_999)); + // Exact boundary is next slot + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 12_000)); + // Mid-way through slot 1 + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 18_000)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 23_999)); + // Slot 2 boundary + try testing.expectEqual(@as(?Slot, 2), slotAtMs(mainnet, genesis_ms + 24_000)); +} + +test "overflow safety" { + // Pre-genesis returns null + try testing.expectEqual(@as(?Slot, null), slotAtSec(mainnet, mainnet.genesis_time_sec - 1)); + try testing.expectEqual(@as(?Slot, null), slotAtSec(mainnet, 0)); + try testing.expectEqual(@as(?Slot, null), slotAtMs(mainnet, 0)); + + // Huge slot overflows slotStartSec / slotStartMs + try testing.expectEqual(@as(?UnixSec, null), slotStartSec(mainnet, std.math.maxInt(u64))); + try testing.expectEqual(@as(?UnixMs, null), slotStartMs(mainnet, std.math.maxInt(u64))); + + // Config with maxInt genesis_time_sec: slotAtMs gets null from secToMs overflow + const extreme = Config{ + .genesis_time_sec = std.math.maxInt(u64), + .seconds_per_slot = 12, + .slots_per_epoch = 32, + }; + try testing.expectEqual(@as(?Slot, null), slotAtMs(extreme, 0)); + try testing.expectEqual(@as(?UnixSec, null), slotStartSec(extreme, 1)); + try testing.expectEqual(@as(?UnixMs, null), slotStartMs(extreme, 0)); + + // slotDurationMs on extreme seconds_per_slot returns null + const big_slot = Config{ + .genesis_time_sec = 0, + .seconds_per_slot = std.math.maxInt(u64), + .slots_per_epoch = 1, + }; + try testing.expectEqual(@as(?u64, null), big_slot.slotDurationMs()); + + // msUntilNextSlot returns null when genesis overflows ms conversion + try testing.expectEqual(@as(?u64, null), msUntilNextSlot(extreme, 0)); +} + +test "msUntilNextSlot" { + const genesis_ms = mainnet.genesis_time_sec * 1000; + const slot_ms: u64 = 12_000; + + // Exactly at genesis: full slot duration until next + try testing.expectEqual(@as(?u64, slot_ms), msUntilNextSlot(mainnet, genesis_ms)); + + // 1ms into genesis slot + try testing.expectEqual(@as(?u64, slot_ms - 1), msUntilNextSlot(mainnet, genesis_ms + 1)); + + // Half-way through a slot + try testing.expectEqual(@as(?u64, slot_ms - 6_000), msUntilNextSlot(mainnet, genesis_ms + 6_000)); + + // 1ms before slot boundary + try testing.expectEqual(@as(?u64, 1), msUntilNextSlot(mainnet, genesis_ms + slot_ms - 1)); + + // Exactly at slot 1 boundary: full slot duration until slot 2 + try testing.expectEqual(@as(?u64, slot_ms), msUntilNextSlot(mainnet, genesis_ms + slot_ms)); + + // Pre-genesis: returns time until genesis + try testing.expectEqual(@as(?u64, 1_000), msUntilNextSlot(mainnet, genesis_ms - 1_000)); + try testing.expectEqual(@as(?u64, genesis_ms), msUntilNextSlot(mainnet, 0)); +} + +test "config validate" { + // Valid config passes + try mainnet.validate(); + + // Zero seconds_per_slot is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .seconds_per_slot = 0, + .slots_per_epoch = 32, + }).validate()); + + // Zero slots_per_epoch is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .seconds_per_slot = 12, + .slots_per_epoch = 0, + }).validate()); + + // Both zero is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .seconds_per_slot = 0, + .slots_per_epoch = 0, + }).validate()); + + // Default maximum_gossip_clock_disparity_ms is 500 + try testing.expectEqual(@as(u64, 500), mainnet.maximum_gossip_clock_disparity_ms); + + // genesis_time_sec that overflows sec→ms is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = std.math.maxInt(u64), + .seconds_per_slot = 12, + .slots_per_epoch = 32, + }).validate()); + + // seconds_per_slot that overflows sec→ms is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .seconds_per_slot = std.math.maxInt(u64), + .slots_per_epoch = 32, + }).validate()); +} diff --git a/src/clock/time_source.zig b/src/clock/time_source.zig new file mode 100644 index 000000000..f6e370668 --- /dev/null +++ b/src/clock/time_source.zig @@ -0,0 +1,48 @@ +//! Pluggable time source abstraction. +//! +//! Tagged union with two variants: +//! `.io` – production: reads wall-clock time from `std.Io` +//! `.fake` – testing: reads from a mutable `FakeTime` struct + +const std = @import("std"); +const slot_math = @import("slot_math.zig"); + +/// Production clock backed by std.Io wall-clock time. +pub const RealClock = struct { + io: std.Io, + + pub fn nowMs(self: RealClock) slot_math.UnixMs { + const ms = std.Io.Clock.real.now(self.io).toMilliseconds(); + std.debug.assert(ms >= 0); + return @intCast(ms); + } +}; + +/// Controllable time source for deterministic testing. +pub const FakeTime = struct { + ms: slot_math.UnixMs = 0, + + pub fn setMs(self: *FakeTime, ms: slot_math.UnixMs) void { + self.ms = ms; + } + + pub fn advanceMs(self: *FakeTime, delta: u64) void { + self.ms += delta; + } + + pub fn advanceSlot(self: *FakeTime, config: slot_math.Config) void { + self.ms += config.slotDurationMs() orelse return; + } +}; + +pub const TimeSource = union(enum) { + real: RealClock, + fake: *FakeTime, + + pub fn nowMs(self: TimeSource) slot_math.UnixMs { + return switch (self) { + .real => |c| c.nowMs(), + .fake => |f| f.ms, + }; + } +};