diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 96d271f80..8a3974949 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -87,6 +87,9 @@ jobs: - name: Run fork-choice tests run: | zig build test:fork_choice + - name: Run clock tests + run: | + zig build test:clock - name: Build benchmarks run: | diff --git a/build.zig.zon b/build.zig.zon index 5d354e5e8..e01d4d640 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -53,6 +53,10 @@ // .url = "git+https://github.com/karlseguin/http.zig.git#2924ead41db9e1986db3b4909a82185c576f6361", // .hash = "httpz-0.0.0-PNVzrC7CBgDSMZNP7chgAczR3FyAz5Eum6Kb1_K9kmYR", // }, + .zio = .{ + .url = "git+https://github.com/lalinsky/zio?ref=v0.11.0#53abdf9ce5e6279e29703a39180ce1cedd8ac4ca", + .hash = "zio-0.11.0-xHbVVEqQGwDz0MYoQ0j7Ke0Y9E_iZVe5eLAWK4gd9U0i", + }, }, .options_modules = .{ .build_options = .{ @@ -132,6 +136,13 @@ .time = .{ .root_source_file = "src/time.zig", }, + .bounded_array = .{ + .root_source_file = "src/bounded_array.zig", + }, + .clock = .{ + .root_source_file = "src/clock/root.zig", + .imports = .{ .zio, .consensus_types, .bounded_array }, + }, .hashing = .{ .root_source_file = "src/hashing/root.zig", .imports = .{ .build_options, .hex, .hashtree }, @@ -341,6 +352,7 @@ // Per-module unit tests (mirror the old auto-generated `test:` steps). .constants = .{ .root_module = .constants }, .hex = .{ .root_module = .hex }, + .bounded_array = .{ .root_module = .bounded_array }, .hashing = .{ .root_module = .hashing }, .preset = .{ .root_module = .preset }, .persistent_merkle_tree = .{ .root_module = .persistent_merkle_tree }, @@ -352,6 +364,7 @@ .state_transition = .{ .root_module = .state_transition }, .era = .{ .root_module = .era }, .fork_choice = .{ .root_module = .fork_choice }, + .clock = .{ .root_module = .clock }, .int = .{ .root_module = .{ diff --git a/src/bounded_array.zig b/src/bounded_array.zig new file mode 100644 index 000000000..34af2e668 --- /dev/null +++ b/src/bounded_array.zig @@ -0,0 +1,148 @@ +//! Stack-resident array with a compile-time hard capacity. No allocator; +//! runtime ops are infallible — over-capacity is a programming error and +//! asserts rather than returning an error. + +const std = @import("std"); + +pub fn BoundedArray(comptime T: type, comptime capacity_: u32) type { + return struct { + buffer: [capacity_]T = undefined, + count: u32 = 0, + + const Self = @This(); + pub const capacity: u32 = capacity_; + + pub fn slice(self: *Self) []T { + return self.buffer[0..self.count]; + } + + pub fn constSlice(self: *const Self) []const T { + return self.buffer[0..self.count]; + } + + pub fn full(self: *const Self) bool { + return self.count == capacity; + } + + pub fn empty(self: *const Self) bool { + return self.count == 0; + } + + pub fn push(self: *Self, item: T) void { + std.debug.assert(!self.full()); + self.buffer[self.count] = item; + self.count += 1; + } + + pub fn pop(self: *Self) ?T { + if (self.empty()) return null; + self.count -= 1; + return self.buffer[self.count]; + } + + pub fn orderedRemove(self: *Self, i: u32) void { + std.debug.assert(i < self.count); + var j: u32 = i; + while (j + 1 < self.count) : (j += 1) { + self.buffer[j] = self.buffer[j + 1]; + } + self.count -= 1; + } + + pub fn clear(self: *Self) void { + self.count = 0; + } + }; +} + +const testing = std.testing; + +test "default-constructed is empty" { + const A = BoundedArray(u32, 4); + var a: A = .{}; + try testing.expect(a.empty()); + try testing.expect(!a.full()); + try testing.expectEqual(@as(u32, 0), a.count); + try testing.expectEqual(@as(u32, 4), A.capacity); + try testing.expectEqual(@as(usize, 0), a.slice().len); +} + +test "push fills up to capacity" { + var a: BoundedArray(u32, 3) = .{}; + a.push(10); + a.push(20); + a.push(30); + try testing.expect(a.full()); + try testing.expectEqualSlices(u32, &.{ 10, 20, 30 }, a.slice()); +} + +test "pop returns elements in LIFO order" { + var a: BoundedArray(u32, 3) = .{}; + a.push(10); + a.push(20); + try testing.expectEqual(@as(?u32, 20), a.pop()); + try testing.expectEqual(@as(?u32, 10), a.pop()); + try testing.expectEqual(@as(?u32, null), a.pop()); +} + +test "orderedRemove shifts trailing elements down" { + var a: BoundedArray(u32, 4) = .{}; + a.push(1); + a.push(2); + a.push(3); + a.push(4); + a.orderedRemove(1); + try testing.expectEqualSlices(u32, &.{ 1, 3, 4 }, a.slice()); + a.orderedRemove(0); + try testing.expectEqualSlices(u32, &.{ 3, 4 }, a.slice()); + a.orderedRemove(1); + try testing.expectEqualSlices(u32, &.{3}, a.slice()); +} + +test "clear resets count without touching buffer" { + var a: BoundedArray(u32, 3) = .{}; + a.push(1); + a.push(2); + a.clear(); + try testing.expect(a.empty()); + try testing.expectEqualSlices(u32, &.{}, a.slice()); +} + +test "slice allows in-place mutation" { + var a: BoundedArray(u32, 3) = .{}; + a.push(1); + a.push(2); + a.push(3); + for (a.slice()) |*x| x.* *= 10; + try testing.expectEqualSlices(u32, &.{ 10, 20, 30 }, a.constSlice()); +} + +test "constSlice from const receiver returns read-only view" { + var a: BoundedArray(u32, 3) = .{}; + a.push(1); + a.push(2); + const ptr: *const BoundedArray(u32, 3) = &a; + const view = ptr.constSlice(); + try testing.expectEqual(@as(usize, 2), view.len); + try testing.expectEqualSlices(u32, &.{ 1, 2 }, view); +} + +test "struct payload" { + const Entry = struct { id: u64, value: i32 }; + var a: BoundedArray(Entry, 2) = .{}; + a.push(.{ .id = 1, .value = -5 }); + a.push(.{ .id = 2, .value = 7 }); + try testing.expectEqual(@as(u64, 1), a.slice()[0].id); + try testing.expectEqual(@as(i32, 7), a.slice()[1].value); +} + +test "comptime construction" { + const a = comptime blk: { + var arr: BoundedArray(u32, 3) = .{}; + arr.push(1); + arr.push(2); + break :blk arr; + }; + try testing.expectEqual(@as(u32, 2), a.count); + try testing.expectEqualSlices(u32, &.{ 1, 2 }, a.constSlice()); +} diff --git a/src/clock/EventClock.zig b/src/clock/EventClock.zig new file mode 100644 index 000000000..c89f890c0 --- /dev/null +++ b/src/clock/EventClock.zig @@ -0,0 +1,1273 @@ +//! Layer 2 – Event-driven beacon clock. +//! +//! Combines `SlotClock` with an async I/O loop to emit slot/epoch events +//! and dispatch waiters. All public methods are safe to call from the +//! main thread; the internal loop runs as a single cooperative fiber. +//! +//! Designed for a cooperative single-fiber `std.Io` backend (e.g. zio). +//! `start()` and `waitForSlot()` use `std.Io.concurrent` so a backend +//! that can't guarantee concurrent execution surfaces as +//! `error.ConcurrencyUnavailable` rather than deadlocking. + +const std = @import("std"); +const Allocator = std.mem.Allocator; +const bounded_array = @import("bounded_array"); +const slot_math = @import("slot_math.zig"); +const SlotClock = @import("SlotClock.zig"); +const time_source = @import("time_source.zig"); + +const EventClock = @This(); + +allocator: Allocator, +io: std.Io, +clock: SlotClock, + +stopped: bool = false, +loop_future: ?std.Io.Future(void) = null, + +next_listener_id: ListenerId = 1, +slot_listeners: bounded_array.BoundedArray(SlotListenerEntry, max_slot_listeners) = .{}, +epoch_listeners: bounded_array.BoundedArray(EpochListenerEntry, max_epoch_listeners) = .{}, +slot_snapshot: bounded_array.BoundedArray(SlotSnapshot, max_slot_listeners) = .{}, +epoch_snapshot: bounded_array.BoundedArray(EpochSnapshot, max_epoch_listeners) = .{}, + +waiters: WaiterQueue, + +pub const Slot = slot_math.Slot; +pub const Epoch = slot_math.Epoch; +pub const Config = slot_math.Config; +pub const ListenerId = u64; +pub const TimeSource = time_source.TimeSource; + +pub const max_slot_listeners: u32 = 16; +pub const max_epoch_listeners: u32 = 16; +pub const max_waiters: u32 = 1024; + +pub const Error = error{ + InvalidConfig, + OutOfMemory, + ListenerLimitReached, + WaiterLimitReached, + Aborted, + ConcurrencyUnavailable, +}; + +const WaitState = struct { + io: std.Io, + allocator: Allocator, + event: std.Io.Event = .unset, + aborted: bool = false, +}; + +const WaiterEntry = struct { + target: Slot, + state: *WaitState, +}; + +const SlotListenerEntry = struct { + id: ListenerId, + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +}; + +const EpochListenerEntry = struct { + id: ListenerId, + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +}; + +const SlotSnapshot = struct { + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +}; + +const EpochSnapshot = struct { + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +}; + +const WaiterQueue = std.PriorityQueue(WaiterEntry, void, struct { + fn compare(_: void, a: WaiterEntry, b: WaiterEntry) std.math.Order { + return std.math.order(a.target, b.target); + } +}.compare); + +pub fn init(self: *EventClock, allocator: Allocator, config: Config, io_handle: std.Io) Error!void { + self.* = .{ + .allocator = allocator, + .io = io_handle, + .clock = undefined, + .waiters = WaiterQueue.initContext({}), + }; + self.clock = SlotClock.init(config, .{ .real = .{ .io = io_handle } }) catch return error.InvalidConfig; +} + +/// Start the auto-advance loop. Idempotent; second call is a no-op. +pub fn start(self: *EventClock) Error!void { + if (self.loop_future != null) return; + self.loop_future = std.Io.concurrent(self.io, EventClock.runAutoLoop, .{self}) catch + return error.ConcurrencyUnavailable; +} + +/// Signal the loop to stop and abort all pending waiters. Idempotent. +pub fn stop(self: *EventClock) void { + if (self.stopped) return; + self.stopped = true; + self.abortAllWaiters(); +} + +/// Signal the loop to stop, cancel the fiber, and wait for it to finish. +pub fn join(self: *EventClock) void { + self.stop(); + var maybe_future = self.loop_future; + self.loop_future = null; + if (maybe_future) |*future| { + future.cancel(self.io); + future.await(self.io); + } +} + +/// Release all resources. Calls `stop()` + `join()` internally. +pub fn deinit(self: *EventClock) void { + self.stop(); + self.join(); + self.waiters.deinit(self.allocator); + self.* = undefined; +} + +// Inside a callback, `offSlot` / `offEpoch` are safe; `onSlot` / `onEpoch` +// are not — they may overwrite the snapshot iterated by the active emit. + +/// Register a slot listener. Returns an ID for later removal via `offSlot`. +pub fn onSlot( + self: *EventClock, + callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, + ctx: ?*anyopaque, +) Error!ListenerId { + if (self.slot_listeners.full()) return error.ListenerLimitReached; + self.slot_listeners.push(.{ + .id = self.next_listener_id, + .callback = callback, + .ctx = ctx, + }); + const id = self.next_listener_id; + self.next_listener_id += 1; + return id; +} + +/// Unregister a slot listener. Returns `true` if found and removed. +pub fn offSlot(self: *EventClock, id: ListenerId) bool { + for (self.slot_listeners.slice(), 0..) |listener, i| { + if (listener.id == id) { + self.slot_listeners.orderedRemove(@intCast(i)); + return true; + } + } + return false; +} + +/// Register an epoch listener. Returns an ID for later removal via `offEpoch`. +pub fn onEpoch( + self: *EventClock, + callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, + ctx: ?*anyopaque, +) Error!ListenerId { + if (self.epoch_listeners.full()) return error.ListenerLimitReached; + self.epoch_listeners.push(.{ + .id = self.next_listener_id, + .callback = callback, + .ctx = ctx, + }); + const id = self.next_listener_id; + self.next_listener_id += 1; + return id; +} + +/// Unregister an epoch listener. Returns `true` if found and removed. +pub fn offEpoch(self: *EventClock, id: ListenerId) bool { + for (self.epoch_listeners.slice(), 0..) |listener, i| { + if (listener.id == id) { + self.epoch_listeners.orderedRemove(@intCast(i)); + return true; + } + } + return false; +} + +// Accessors that expose "current" slot/epoch state call catchUp() first so +// reads emit any pending events before returning. Pure time-arithmetic +// helpers (slotWithFutureTolerance, secFromSlot, …) do not. + +pub fn currentSlot(self: *EventClock) ?Slot { + self.catchUp(); + return self.clock.currentSlot(); +} + +pub fn currentEpoch(self: *EventClock) ?Epoch { + self.catchUp(); + return self.clock.currentEpoch(); +} + +pub fn currentSlotOrGenesis(self: *EventClock) Slot { + self.catchUp(); + return self.clock.currentSlotOrGenesis(); +} + +pub fn currentEpochOrGenesis(self: *EventClock) Epoch { + self.catchUp(); + return self.clock.currentEpochOrGenesis(); +} + +pub fn currentSlotWithGossipDisparity(self: *EventClock) Slot { + self.catchUp(); + return self.clock.currentSlotWithGossipDisparity(); +} + +pub fn isCurrentSlotGivenGossipDisparity(self: *EventClock, slot: Slot) bool { + self.catchUp(); + return self.clock.isCurrentSlotGivenGossipDisparity(slot); +} + +pub fn slotWithFutureTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { + return self.clock.slotWithFutureTolerance(tolerance_ms); +} + +pub fn slotWithPastTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { + return self.clock.slotWithPastTolerance(tolerance_ms); +} + +pub fn secFromSlot(self: *EventClock, slot: Slot, to_sec: ?u64) ?i64 { + return self.clock.secFromSlot(slot, to_sec); +} + +pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?u64) ?i64 { + return self.clock.msFromSlot(slot, to_ms); +} + +/// Return type from `waitForSlot`. The caller MUST either: +/// - call `await()` to wait for the target slot and release resources, OR +/// - call `cancel()` to abort and release resources, OR +/// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`. +/// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks +/// the internal WaitState. +/// +/// Idiomatic usage with `errdefer`: +/// var fut = try ec.waitForSlot(target); +/// errdefer fut.cancel(); +/// try fut.await(io); +pub const WaitForSlotResult = struct { + inner: std.Io.Future(Error!void), + state: ?*WaitState, + clock: ?*EventClock, + + /// Create an immediately-resolved result (no async work needed). + /// Relies on `std.Io.Future.await` returning `.result` when `.any_future == null`. + fn immediate(result: Error!void) WaitForSlotResult { + return .{ .inner = .{ .any_future = null, .result = result }, .state = null, .clock = null }; + } + + pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void { + const result = self.inner.await(io); + // Free state only AFTER the fiber returns, so it can't observe a + // freed `state.aborted` between event-wake and its own return. + if (self.state) |s| s.allocator.destroy(s); + self.state = null; + self.clock = null; + return result; + } + + /// Abort a pending wait and release its resources. Idempotent — safe + /// to call on an already-awaited, already-cancelled, or immediate result. + pub fn cancel(self: *WaitForSlotResult) void { + const state = self.state orelse return; + // Remove from waiter queue before freeing, so abortAllWaiters + // won't dereference the freed state pointer. + if (self.clock) |clock| { + for (clock.waiters.items, 0..) |entry, i| { + if (entry.state == state) { + _ = clock.waiters.popIndex(i); + break; + } + } + } + state.aborted = true; + state.event.set(state.io); + // Must await the fiber so it finishes before we free its state. + // The fiber returns error.Aborted (expected) or {} (already dispatched). + _ = self.inner.await(state.io) catch |err| { + std.debug.assert(err == error.Aborted); + }; + state.allocator.destroy(state); + self.state = null; + self.clock = null; + } +}; + +/// Return a future that resolves when the clock reaches `target`. +/// See `WaitForSlotResult` for the caller's obligations. +pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult { + if (self.stopped) { + return WaitForSlotResult.immediate(error.Aborted); + } + self.catchUp(); + if (self.clock.current_slot) |slot| { + if (slot >= target) { + return WaitForSlotResult.immediate({}); + } + } + if (self.waiters.count() >= max_waiters) { + return error.WaiterLimitReached; + } + + const state = self.allocator.create(WaitState) catch return error.OutOfMemory; + errdefer self.allocator.destroy(state); + + state.* = .{ + .io = self.io, + .allocator = self.allocator, + }; + + if (self.stopped) { + self.allocator.destroy(state); + return WaitForSlotResult.immediate(error.Aborted); + } + self.waiters.push(self.allocator, .{ .target = target, .state = state }) catch return error.OutOfMemory; + self.dispatchWaiters(self.clock.current_slot); + + const inner = std.Io.concurrent(self.io, waitForSlotFutureAwait, .{state}) catch { + for (self.waiters.items, 0..) |entry, i| { + if (entry.state == state) { + _ = self.waiters.popIndex(i); + break; + } + } + return error.ConcurrencyUnavailable; + }; + + return .{ + .inner = inner, + .state = state, + .clock = self, + }; +} + +/// Ensure event-clock state is caught up to wall-clock time. +/// Emits any intermediate slot/epoch events to listeners. +/// No-op if already caught up or pre-genesis (currentSlot() returns null). +fn catchUp(self: *EventClock) void { + if (self.clock.currentSlot()) |wall_slot| { + self.advanceAndDispatch(wall_slot); + } +} + +fn emitSlot(self: *EventClock, slot: Slot) void { + self.slot_snapshot.clear(); + for (self.slot_listeners.slice()) |listener| { + self.slot_snapshot.push(.{ + .callback = listener.callback, + .ctx = listener.ctx, + }); + } + for (self.slot_snapshot.slice()) |listener| { + listener.callback(listener.ctx, slot); + } +} + +fn emitEpoch(self: *EventClock, epoch: Epoch) void { + self.epoch_snapshot.clear(); + for (self.epoch_listeners.slice()) |listener| { + self.epoch_snapshot.push(.{ + .callback = listener.callback, + .ctx = listener.ctx, + }); + } + for (self.epoch_snapshot.slice()) |listener| { + listener.callback(listener.ctx, epoch); + } +} + +fn dispatchWaiters(self: *EventClock, current_slot: ?Slot) void { + const slot = current_slot orelse return; + while (self.waiters.peek()) |head| { + if (head.target > slot) break; + const waiter = self.waiters.pop().?; + waiter.state.aborted = false; + waiter.state.event.set(waiter.state.io); + } +} + +fn abortAllWaiters(self: *EventClock) void { + while (self.waiters.pop()) |waiter| { + waiter.state.aborted = true; + waiter.state.event.set(waiter.state.io); + } +} + +fn advanceAndDispatch(self: *EventClock, target: Slot) void { + var iter = self.clock.advanceTo(target); + // Check `stopped` *before* iter.next() so a callback that calls stop() + // can't leave current_slot one ahead of the last-emitted slot. + while (true) { + if (self.stopped) break; + const event = iter.next() orelse break; + switch (event) { + .slot => |s| { + self.emitSlot(s); + self.dispatchWaiters(s); + }, + .epoch => |e| self.emitEpoch(e), + } + } + // Defensive: handles edge cases where advanceTo yields zero events + // (already at target) but waiters were added between loop ticks. + // In the normal case, this is a no-op because the last .slot event + // already dispatched waiters at the same slot value. + self.dispatchWaiters(self.clock.current_slot); +} + +fn runAutoLoop(self: *EventClock) void { + while (!self.stopped) { + const now_ms = self.clock.time.nowMs(); + // Config validation guarantees sec→ms won't overflow, so null here + // indicates a logic bug. Break instead of spinning at 1ms. + const next_ms = slot_math.msUntilNextSlot(self.clock.config, now_ms) orelse { + std.log.err("EventClock: msUntilNextSlot returned null (config overflow?), stopping loop", .{}); + self.stop(); + break; + }; + const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64); + + // Sleep failure: cancellation (from join()) exits the loop; + // other errors re-check the stopped flag. + std.Io.sleep( + self.io, + std.Io.Duration.fromMilliseconds(sleep_ms), + .awake, + ) catch |err| { + if (err == error.Canceled) break; + std.log.debug("EventClock: sleep failed ({s}), retrying", .{@errorName(err)}); + continue; + }; + + if (self.stopped) break; + // Only advance after genesis. Before genesis currentSlot() returns + // null — skipping here prevents emitting slot 0 prematurely. + if (self.clock.currentSlot()) |slot| { + self.advanceAndDispatch(slot); + } + } + // Non-terminating event loop: exits only when `self.stopped` is set. + // - normal stop(): sets flag, next iteration's `!self.stopped` exits + // - overflow path: in-body break after self.stop() + // - join(): always calls stop() before cancelling the fiber, so the + // `error.Canceled` break also satisfies stopped == true + std.debug.assert(self.stopped); +} + +fn waitForSlotFutureAwait(state: *WaitState) Error!void { + // Do NOT free state here — `state.aborted` is read after the wake, + // and the caller (`WaitForSlotResult.await`) frees only once this fiber + // has fully returned. + state.event.waitUncancelable(state.io); + if (state.aborted) return error.Aborted; +} + +const testing = std.testing; +const zio = @import("zio"); + +fn runInRuntime(comptime body: anytype) !void { + const rt = try zio.Runtime.init(testing.allocator, .{}); + defer rt.deinit(); + + var handle = try rt.spawn(body, .{rt.io()}); + try handle.join(); +} + +fn nowSecAt(io_handle: std.Io) u64 { + const sec = std.Io.Clock.real.now(io_handle).toSeconds(); + std.debug.assert(sec >= 0); + return @intCast(sec); +} + +fn nowMsAt(io_handle: std.Io) u64 { + const ms = std.Io.Clock.real.now(io_handle).toMilliseconds(); + std.debug.assert(ms >= 0); + return @intCast(ms); +} + +const EventTraceState = struct { + slots: [64]Slot = undefined, + slot_len: usize = 0, + epochs: [64]u64 = undefined, + epoch_len: usize = 0, + + fn onSlot(ctx: ?*anyopaque, slot: Slot) void { + const self: *EventTraceState = @ptrCast(@alignCast(ctx.?)); + if (self.slot_len >= self.slots.len) return; + self.slots[self.slot_len] = slot; + self.slot_len += 1; + } + + fn onEpoch(ctx: ?*anyopaque, epoch: u64) void { + const self: *EventTraceState = @ptrCast(@alignCast(ctx.?)); + if (self.epoch_len >= self.epochs.len) return; + self.epochs[self.epoch_len] = epoch; + self.epoch_len += 1; + } +}; + +test "lifecycle: init -> register -> start -> receive events -> stop" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + try clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); + try fut.await(io_handle); + + try testing.expect(trace.slot_len > 0); + } + }.run); +} + +test "waitForSlot resolves immediately when at target" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + const current = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(current); + errdefer fut.cancel(); + try fut.await(io_handle); + } + }.run); +} + +test "waitForSlot returns aborted on stop" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .slot_duration_ms = 2_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var fut = try clock.waitForSlot(100); + errdefer fut.cancel(); + clock.stop(); + try testing.expectError(error.Aborted, fut.await(io_handle)); + } + }.run); +} + +test "offSlot/offEpoch stop event delivery" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + const slot_id = try clock.onSlot(EventTraceState.onSlot, &trace); + const epoch_id = try clock.onEpoch(EventTraceState.onEpoch, &trace); + try testing.expect(clock.offSlot(slot_id)); + try testing.expect(clock.offEpoch(epoch_id)); + + clock.advanceAndDispatch(6); + try testing.expectEqual(@as(usize, 0), trace.slot_len); + try testing.expectEqual(@as(usize, 0), trace.epoch_len); + } + }.run); +} + +test "stop/join are idempotent" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .slot_duration_ms = 2_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + clock.stop(); + clock.stop(); + clock.join(); + clock.join(); + } + }.run); +} + +test "epoch event is delivered when crossing epoch boundary" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 2, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + _ = try clock.onEpoch(EventTraceState.onEpoch, &trace); + + clock.advanceAndDispatch(5); + + try testing.expect(trace.slot_len > 0); + try testing.expect(trace.epoch_len > 0); + try testing.expectEqual(@as(u64, 1), trace.epochs[0]); + } + }.run); +} + +test "multiple waiters are dispatched in target-slot order" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 10, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var fut5 = try clock.waitForSlot(5); + errdefer fut5.cancel(); + + var fut3 = try clock.waitForSlot(3); + errdefer fut3.cancel(); + + var fut1 = try clock.waitForSlot(1); + errdefer fut1.cancel(); + + clock.advanceAndDispatch(3); + + try fut1.await(io_handle); + try fut3.await(io_handle); + + clock.stop(); + try testing.expectError(error.Aborted, fut5.await(io_handle)); + } + }.run); +} + +test "cancel releases WaitState without awaiting" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 10, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + // testing.allocator detects a leak if cancel fails to free. + var fut = try clock.waitForSlot(999); + fut.cancel(); + } + }.run); +} + +test "real-time: no slot events emitted before genesis" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 5, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + try clock.start(); + + std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(1500), .awake) catch {}; + + try testing.expectEqual(@as(usize, 0), trace.slot_len); + } + }.run); +} + +test "real-time: slot events fire with correct timing" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + try clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + const before_ms = nowMsAt(io_handle); + var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); + try fut.await(io_handle); + const elapsed = nowMsAt(io_handle) - before_ms; + + try testing.expect(elapsed < 2000); + try testing.expect(trace.slot_len > 0); + try testing.expect(trace.slots[trace.slot_len - 1] >= start_slot + 1); + } + }.run); +} + +test "real-time: multi-slot advancement delivers ordered events" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .slot_duration_ms = 1_000, + .slots_per_epoch = 8, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + + try clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(start_slot + 2); + errdefer fut.cancel(); + try fut.await(io_handle); + + try testing.expect(trace.slot_len >= 2); + for (1..trace.slot_len) |i| { + try testing.expect(trace.slots[i] > trace.slots[i - 1]); + } + } + }.run); +} + +test "real-time: stop+join cancels promptly" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 100, + .slot_duration_ms = 12_000, + .slots_per_epoch = 32, + }, io_handle); + defer clock.deinit(); + + try clock.start(); + + // Give the loop fiber time to enter its sleep. + std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {}; + + const before_ms = nowMsAt(io_handle); + clock.stop(); + clock.join(); + const elapsed = nowMsAt(io_handle) - before_ms; + + // join() cancels the sleeping future directly — should return + // almost immediately, NOT after the full 12-second slot. + try testing.expect(elapsed < 1500); + } + }.run); +} + +test "real-time: epoch boundary event fires" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + const base_now = nowSecAt(io_handle); + + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = base_now, + .slot_duration_ms = 1_000, + .slots_per_epoch = 2, + }, io_handle); + defer clock.deinit(); + + var trace = EventTraceState{}; + _ = try clock.onSlot(EventTraceState.onSlot, &trace); + _ = try clock.onEpoch(EventTraceState.onEpoch, &trace); + + try clock.start(); + + const start_slot = clock.currentSlotOrGenesis(); + var fut = try clock.waitForSlot(start_slot + 3); + errdefer fut.cancel(); + try fut.await(io_handle); + + try testing.expect(trace.slot_len >= 3); + try testing.expect(trace.epoch_len > 0); + } + }.run); +} + +fn nopSlot(_: ?*anyopaque, _: Slot) void {} +fn nopEpoch(_: ?*anyopaque, _: Epoch) void {} + +const ReentrancyCtx = struct { + clock: *EventClock, + self_id: ?ListenerId = null, + fired_count: usize = 0, + + fn offSelf(ctx: ?*anyopaque, _: Slot) void { + const self: *ReentrancyCtx = @ptrCast(@alignCast(ctx.?)); + self.fired_count += 1; + if (self.self_id) |id| { + _ = self.clock.offSlot(id); + self.self_id = null; + } + } + + fn stopClock(ctx: ?*anyopaque, _: Slot) void { + const self: *ReentrancyCtx = @ptrCast(@alignCast(ctx.?)); + self.fired_count += 1; + self.clock.stop(); + } + + fn justCount(ctx: ?*anyopaque, _: Slot) void { + const self: *ReentrancyCtx = @ptrCast(@alignCast(ctx.?)); + self.fired_count += 1; + } +}; + +test "reentrancy: callback can offSlot itself mid-dispatch" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var ctx_a = ReentrancyCtx{ .clock = &clock }; + var ctx_b = ReentrancyCtx{ .clock = &clock }; + const id_a = try clock.onSlot(ReentrancyCtx.offSelf, &ctx_a); + ctx_a.self_id = id_a; + _ = try clock.onSlot(ReentrancyCtx.justCount, &ctx_b); + + clock.advanceAndDispatch(0); + clock.advanceAndDispatch(2); + + try testing.expectEqual(@as(usize, 1), ctx_a.fired_count); + try testing.expectEqual(@as(usize, 3), ctx_b.fired_count); + } + }.run); +} + +test "reentrancy: callback can stop the clock; no further slots emitted" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var ctx = ReentrancyCtx{ .clock = &clock }; + _ = try clock.onSlot(ReentrancyCtx.stopClock, &ctx); + + clock.advanceAndDispatch(5); + + try testing.expectEqual(@as(usize, 1), ctx.fired_count); + try testing.expect(clock.stopped); + try testing.expectEqual(@as(?Slot, 0), clock.clock.current_slot); + } + }.run); +} + +test "ListenerLimitReached: onSlot/onEpoch reject the (limit+1)th registration" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + for (0..max_slot_listeners) |_| { + _ = try clock.onSlot(nopSlot, null); + } + try testing.expectError(error.ListenerLimitReached, clock.onSlot(nopSlot, null)); + + for (0..max_epoch_listeners) |_| { + _ = try clock.onEpoch(nopEpoch, null); + } + try testing.expectError(error.ListenerLimitReached, clock.onEpoch(nopEpoch, null)); + } + }.run); +} + +test "WaiterLimitReached: waitForSlot rejects the (limit+1)th waiter" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + var futs: [max_waiters]WaitForSlotResult = undefined; + for (&futs) |*f| f.* = try clock.waitForSlot(999_999); + try testing.expectError(error.WaiterLimitReached, clock.waitForSlot(999_999)); + for (&futs) |*f| f.cancel(); + } + }.run); +} + +test "many waiters at same target slot all resolve on advance" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var clock: EventClock = undefined; + try clock.init(testing.allocator, .{ + .genesis_time_sec = nowSecAt(io_handle) + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = 4, + }, io_handle); + defer clock.deinit(); + + const N = 16; + var futs: [N]WaitForSlotResult = undefined; + for (&futs) |*f| f.* = try clock.waitForSlot(5); + + clock.advanceAndDispatch(5); + + for (&futs) |*f| try f.await(io_handle); + } + }.run); +} + +const PropertyTracker = struct { + slot_events: std.ArrayListUnmanaged(Slot) = .empty, + epoch_events: std.ArrayListUnmanaged(Epoch) = .empty, + + fn onSlot(ctx: ?*anyopaque, slot: Slot) void { + const self: *PropertyTracker = @ptrCast(@alignCast(ctx.?)); + self.slot_events.append(testing.allocator, slot) catch unreachable; + } + + fn onEpoch(ctx: ?*anyopaque, epoch: Epoch) void { + const self: *PropertyTracker = @ptrCast(@alignCast(ctx.?)); + self.epoch_events.append(testing.allocator, epoch) catch unreachable; + } + + fn deinit(self: *PropertyTracker) void { + self.slot_events.deinit(testing.allocator); + self.epoch_events.deinit(testing.allocator); + } +}; + +const PropertyOp = union(enum) { + on_slot, + on_epoch, + off_slot: usize, + off_epoch: usize, + advance_by: u8, + wait_for_slot_at_offset: i32, + cancel_waiter: usize, + stop, +}; + +const PropertyWaiter = struct { + target: Slot, + fut: WaitForSlotResult, + expected_aborted: bool, +}; + +const PropertyState = struct { + spe: u64, + model_current_slot: ?Slot = null, + model_stopped: bool = false, + clock: *EventClock, + + slot_listener_ids: std.ArrayListUnmanaged(ListenerId) = .empty, + slot_trackers: std.ArrayListUnmanaged(*PropertyTracker) = .empty, + slot_expected: std.ArrayListUnmanaged(std.ArrayListUnmanaged(Slot)) = .empty, + + epoch_listener_ids: std.ArrayListUnmanaged(ListenerId) = .empty, + epoch_trackers: std.ArrayListUnmanaged(*PropertyTracker) = .empty, + epoch_expected: std.ArrayListUnmanaged(std.ArrayListUnmanaged(Epoch)) = .empty, + + waiters: std.ArrayListUnmanaged(PropertyWaiter) = .empty, + + const MAX_LISTENERS = 8; + + fn deinit(self: *PropertyState) void { + const a = testing.allocator; + for (self.slot_trackers.items) |t| { + t.deinit(); + a.destroy(t); + } + for (self.epoch_trackers.items) |t| { + t.deinit(); + a.destroy(t); + } + for (self.slot_expected.items) |*lst| lst.deinit(a); + for (self.epoch_expected.items) |*lst| lst.deinit(a); + self.slot_listener_ids.deinit(a); + self.slot_trackers.deinit(a); + self.slot_expected.deinit(a); + self.epoch_listener_ids.deinit(a); + self.epoch_trackers.deinit(a); + self.epoch_expected.deinit(a); + self.waiters.deinit(a); + } + + fn applyOp(self: *PropertyState, op: PropertyOp) !void { + const a = testing.allocator; + switch (op) { + .on_slot => { + if (self.slot_listener_ids.items.len >= MAX_LISTENERS) return; + const tracker = try a.create(PropertyTracker); + tracker.* = .{}; + errdefer { + tracker.deinit(); + a.destroy(tracker); + } + + // Reserve before clock.onSlot so a subsequent append can't OOM + // and leave the clock pointing at a tracker we then free. + try self.slot_listener_ids.ensureUnusedCapacity(a, 1); + try self.slot_trackers.ensureUnusedCapacity(a, 1); + try self.slot_expected.ensureUnusedCapacity(a, 1); + const id = try self.clock.onSlot(PropertyTracker.onSlot, tracker); + self.slot_listener_ids.appendAssumeCapacity(id); + self.slot_trackers.appendAssumeCapacity(tracker); + self.slot_expected.appendAssumeCapacity(.empty); + }, + .on_epoch => { + if (self.epoch_listener_ids.items.len >= MAX_LISTENERS) return; + const tracker = try a.create(PropertyTracker); + tracker.* = .{}; + errdefer { + tracker.deinit(); + a.destroy(tracker); + } + + try self.epoch_listener_ids.ensureUnusedCapacity(a, 1); + try self.epoch_trackers.ensureUnusedCapacity(a, 1); + try self.epoch_expected.ensureUnusedCapacity(a, 1); + const id = try self.clock.onEpoch(PropertyTracker.onEpoch, tracker); + self.epoch_listener_ids.appendAssumeCapacity(id); + self.epoch_trackers.appendAssumeCapacity(tracker); + self.epoch_expected.appendAssumeCapacity(.empty); + }, + .off_slot => |idx| { + if (idx >= self.slot_listener_ids.items.len) return; + const id = self.slot_listener_ids.items[idx]; + try testing.expect(self.clock.offSlot(id)); + _ = self.slot_listener_ids.orderedRemove(idx); + const t = self.slot_trackers.orderedRemove(idx); + var exp = self.slot_expected.orderedRemove(idx); + try expectEqualSlices(Slot, exp.items, t.slot_events.items); + exp.deinit(a); + t.deinit(); + a.destroy(t); + }, + .off_epoch => |idx| { + if (idx >= self.epoch_listener_ids.items.len) return; + const id = self.epoch_listener_ids.items[idx]; + try testing.expect(self.clock.offEpoch(id)); + _ = self.epoch_listener_ids.orderedRemove(idx); + const t = self.epoch_trackers.orderedRemove(idx); + var exp = self.epoch_expected.orderedRemove(idx); + try expectEqualSlices(Epoch, exp.items, t.epoch_events.items); + exp.deinit(a); + t.deinit(); + a.destroy(t); + }, + .advance_by => |k| { + if (k == 0 or self.model_stopped) return; + const begin = self.model_current_slot; + const s_first: Slot = if (begin) |c| c + 1 else 0; + const s_last: Slot = if (begin) |c| c + k else @as(Slot, k) - 1; + + var s: Slot = s_first; + while (true) : (s += 1) { + for (self.slot_expected.items) |*lst| try lst.append(a, s); + if (s > 0) { + const prev_e = (s - 1) / self.spe; + const new_e = s / self.spe; + if (new_e > prev_e) { + for (self.epoch_expected.items) |*lst| try lst.append(a, new_e); + } + } + if (s == s_last) break; + } + self.model_current_slot = s_last; + self.clock.advanceAndDispatch(s_last); + + for (self.waiters.items) |*w| { + if (w.target <= s_last) w.expected_aborted = false; + } + }, + .wait_for_slot_at_offset => |offset| { + if (self.model_stopped) return; + const base: i64 = if (self.model_current_slot) |c| @intCast(c) else -1; + const target_signed = base + offset; + if (target_signed < 0) return; + const target: Slot = @intCast(target_signed); + const fut = try self.clock.waitForSlot(target); + const resolved_now = if (self.model_current_slot) |c| c >= target else false; + try self.waiters.append(a, .{ + .target = target, + .fut = fut, + .expected_aborted = !resolved_now, + }); + }, + .cancel_waiter => |idx| { + if (idx >= self.waiters.items.len) return; + var w = self.waiters.orderedRemove(idx); + w.fut.cancel(); + }, + .stop => { + if (self.model_stopped) return; + self.model_stopped = true; + self.clock.stop(); + }, + } + } + + fn finalize(self: *PropertyState, io: std.Io) !void { + if (!self.model_stopped) { + self.model_stopped = true; + self.clock.stop(); + } + + for (self.slot_trackers.items, self.slot_expected.items) |t, exp| { + try expectEqualSlices(Slot, exp.items, t.slot_events.items); + } + for (self.epoch_trackers.items, self.epoch_expected.items) |t, exp| { + try expectEqualSlices(Epoch, exp.items, t.epoch_events.items); + } + + for (self.waiters.items) |*w| { + const result = w.fut.await(io); + if (w.expected_aborted) { + try testing.expectError(error.Aborted, result); + } else { + try result; + } + } + self.waiters.clearRetainingCapacity(); + } +}; + +const expectEqualSlices = std.testing.expectEqualSlices; + +fn genPropertyOp(rng: std.Random, state: *const PropertyState) PropertyOp { + while (true) { + const r = rng.uintLessThan(u32, 100); + if (r < 18) return .on_slot; + if (r < 32) return .on_epoch; + if (r < 42) { + if (state.slot_listener_ids.items.len == 0) continue; + return .{ .off_slot = rng.uintLessThan(usize, state.slot_listener_ids.items.len) }; + } + if (r < 52) { + if (state.epoch_listener_ids.items.len == 0) continue; + return .{ .off_epoch = rng.uintLessThan(usize, state.epoch_listener_ids.items.len) }; + } + if (r < 80) return .{ .advance_by = @intCast(rng.uintLessThan(u32, 8) + 1) }; + if (r < 92) { + const off: i32 = @as(i32, @intCast(rng.uintLessThan(u32, 12))) - 4; + return .{ .wait_for_slot_at_offset = off }; + } + if (r < 98) { + if (state.waiters.items.len == 0) continue; + return .{ .cancel_waiter = rng.uintLessThan(usize, state.waiters.items.len) }; + } + return .stop; + } +} + +fn runPropertyScenario(seed: u64, op_count: u32, io: std.Io) !void { + var prng = std.Random.DefaultPrng.init(seed); + const rng = prng.random(); + + const spe: u64 = 4; + const now_sec = nowSecAt(io); + var clock: EventClock = undefined; + // Genesis far in future → wall-clock never advances; advanceAndDispatch owns time. + try clock.init(testing.allocator, .{ + .genesis_time_sec = now_sec + 1_000_000, + .slot_duration_ms = 1_000, + .slots_per_epoch = spe, + }, io); + defer clock.deinit(); + + var state = PropertyState{ .spe = spe, .clock = &clock }; + defer state.deinit(); + + var i: u32 = 0; + while (i < op_count) : (i += 1) { + const op = genPropertyOp(rng, &state); + try state.applyOp(op); + } + + try state.finalize(io); +} + +test "property: random op sequences match model" { + try runInRuntime(struct { + fn run(io_handle: std.Io) !void { + var seed: u64 = 0; + while (seed < 500) : (seed += 1) { + runPropertyScenario(seed, 50, io_handle) catch |err| { + std.debug.print("property scenario failed at seed={d}: {s}\n", .{ seed, @errorName(err) }); + return err; + }; + } + } + }.run); +} diff --git a/src/clock/SlotClock.zig b/src/clock/SlotClock.zig new file mode 100644 index 000000000..437cef72f --- /dev/null +++ b/src/clock/SlotClock.zig @@ -0,0 +1,327 @@ +//! Layer 1 – Stateful slot clock. +//! +//! Wraps `slot_math` with a `TimeSource` and a cached `current_slot`. +//! Pure-read helpers query wall-clock time; only `advanceTo()` mutates the cache. + +const std = @import("std"); +const slot_math = @import("slot_math.zig"); +const time_source = @import("time_source.zig"); + +const SlotClock = @This(); + +config: Config, +time: TimeSource, +current_slot: ?Slot = null, + +pub const Slot = slot_math.Slot; +pub const Epoch = slot_math.Epoch; +pub const Config = slot_math.Config; +pub const TimeSource = time_source.TimeSource; + +pub const Event = union(enum) { + slot: Slot, + epoch: Epoch, +}; + +pub const AdvanceIterator = struct { + clock: *SlotClock, + target: Slot, + pending_epoch: ?Epoch = null, + + /// Advances the clock one step at a time, yielding slot and epoch events. + /// For each slot advancement: yields .slot first, then .epoch if an epoch boundary was crossed. + /// Returns null when caught up to target. + pub fn next(self: *AdvanceIterator) ?Event { + if (self.pending_epoch) |epoch| { + self.pending_epoch = null; + return .{ .epoch = epoch }; + } + + const current = self.clock.current_slot; + if (current == null) { + self.clock.current_slot = 0; + return .{ .slot = 0 }; + } + + const cur = current.?; + if (cur >= self.target) return null; + if (cur == std.math.maxInt(Slot)) return null; + + const next_slot = cur + 1; + self.clock.current_slot = next_slot; + + const prev_epoch = slot_math.epochAtSlot(self.clock.config, cur); + const new_epoch = slot_math.epochAtSlot(self.clock.config, next_slot); + if (prev_epoch < new_epoch) { + self.pending_epoch = new_epoch; + } + + return .{ .slot = next_slot }; + } +}; + +pub fn init(config: Config, time: TimeSource) error{InvalidConfig}!SlotClock { + try config.validate(); + var self = SlotClock{ + .config = config, + .time = time, + }; + self.current_slot = slot_math.slotAtMs(config, time.nowMs()); + return self; +} + +/// Returns the current wall-clock slot. Pure read — does NOT update +/// the internal `current_slot` cache. Only `advanceTo()` advances the cache. +pub fn currentSlot(self: *const SlotClock) ?Slot { + const now_ms = self.time.nowMs(); + return slot_math.slotAtMs(self.config, now_ms); +} + +pub fn currentEpoch(self: *const SlotClock) ?Epoch { + const slot = self.currentSlot() orelse return null; + return slot_math.epochAtSlot(self.config, slot); +} + +pub fn currentSlotOrGenesis(self: *const SlotClock) Slot { + return self.currentSlot() orelse 0; +} + +pub fn currentEpochOrGenesis(self: *const SlotClock) Epoch { + return self.currentEpoch() orelse 0; +} + +/// Returns the slot the network may be advancing to, accounting for gossip +/// clock disparity. +/// +/// Per phase0/p2p-interface.md, gossip validation rejects future messages with +/// strict `<` (`current_time + MAXIMUM_GOSSIP_CLOCK_DISPARITY < message_time`), +/// so the boundary case (exactly equal) is accepted — hence `<=` here. +/// +pub fn currentSlotWithGossipDisparity(self: *const SlotClock) Slot { + const now_ms = self.time.nowMs(); + const current = slot_math.slotAtMs(self.config, now_ms) orelse 0; + if (current == std.math.maxInt(Slot)) return current; + const next_slot = current + 1; + const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return current; + if (next_slot_ms - now_ms <= self.config.maximum_gossip_clock_disparity_ms) { + return next_slot; + } + return current; +} + +/// See `currentSlotWithGossipDisparity` for the `<=` rationale and the +/// single-snapshot semantics — both apply here too. +pub fn isCurrentSlotGivenGossipDisparity(self: *const SlotClock, slot: Slot) bool { + const now_ms = self.time.nowMs(); + const current = slot_math.slotAtMs(self.config, now_ms) orelse 0; + if (slot == current) return true; + + if (current != std.math.maxInt(Slot)) { + const next_slot = current + 1; + const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return false; + if (next_slot_ms - now_ms <= self.config.maximum_gossip_clock_disparity_ms) { + return slot == next_slot; + } + } + + if (current > 0) { + const current_slot_ms = slot_math.slotStartMs(self.config, current) orelse return false; + if (now_ms - current_slot_ms <= self.config.maximum_gossip_clock_disparity_ms) { + return slot == current - 1; + } + } + + return false; +} + +pub fn slotWithFutureTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { + const now_ms = self.time.nowMs(); + const shifted_ms = std.math.add(u64, now_ms, tolerance_ms) catch return null; + return slot_math.slotAtMs(self.config, shifted_ms); +} + +pub fn slotWithPastTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { + const now_ms = self.time.nowMs(); + // Checked sub: underflow (pre-UNIX-epoch) returns null. + // Pre-genesis but valid timestamp returns 0. + const shifted_ms = std.math.sub(u64, now_ms, tolerance_ms) catch return null; + return slot_math.slotAtMs(self.config, shifted_ms) orelse 0; +} + +pub fn secFromSlot(self: *const SlotClock, slot: Slot, to_sec: ?u64) ?i64 { + const from_sec = slot_math.slotStartSec(self.config, slot) orelse return null; + const end_sec = to_sec orelse @divFloor(self.time.nowMs(), 1000); + const diff = @as(i128, @intCast(end_sec)) - @as(i128, @intCast(from_sec)); + return std.math.cast(i64, diff); +} + +pub fn msFromSlot(self: *const SlotClock, slot: Slot, to_ms: ?u64) ?i64 { + const from_ms = slot_math.slotStartMs(self.config, slot) orelse return null; + const end_ms = to_ms orelse self.time.nowMs(); + const diff = @as(i128, @intCast(end_ms)) - @as(i128, @intCast(from_ms)); + return std.math.cast(i64, diff); +} + +pub fn advanceTo(self: *SlotClock, target: Slot) AdvanceIterator { + return .{ + .clock = self, + .target = target, + }; +} + +const testing = std.testing; + +const test_cfg = Config{ + .genesis_time_sec = 100, + .slot_duration_ms = 12_000, + .slots_per_epoch = 32, + .maximum_gossip_clock_disparity_ms = 500, +}; + +test "pre-genesis returns null, genesis fallback returns zero" { + var fake = time_source.FakeTime{ .ms = 99_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, null), clock.currentSlot()); + try testing.expectEqual(@as(?Epoch, null), clock.currentEpoch()); + try testing.expectEqual(@as(Slot, 0), clock.currentSlotOrGenesis()); + try testing.expectEqual(@as(Epoch, 0), clock.currentEpochOrGenesis()); +} + +test "currentSlot at genesis and advancing" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, 0), clock.currentSlot()); + + fake.setMs(112_000); + try testing.expectEqual(@as(?Slot, 1), clock.currentSlot()); + + fake.setMs(124_000); + try testing.expectEqual(@as(?Slot, 2), clock.currentSlot()); +} + +test "currentEpoch" { + var fake = time_source.FakeTime{ .ms = 100_000 + 32 * 12 * 1000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Epoch, 1), clock.currentEpoch()); +} + +test "advanceTo produces correct slot events" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(3); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 1); + try testing.expect(events[1] == .slot and events[1].slot == 2); + try testing.expect(events[2] == .slot and events[2].slot == 3); + try testing.expectEqual(@as(?Slot, 3), clock.current_slot); +} + +test "advanceTo across epoch boundary emits slot then epoch" { + var fake = time_source.FakeTime{ .ms = 100_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + clock.current_slot = 31; + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(33); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 32); + try testing.expect(events[1] == .epoch and events[1].epoch == 1); + try testing.expect(events[2] == .slot and events[2].slot == 33); +} + +test "advanceTo from null (pre-genesis)" { + var fake = time_source.FakeTime{ .ms = 99_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, null), clock.current_slot); + + var events: [16]Event = undefined; + var count: usize = 0; + var iter = clock.advanceTo(2); + while (iter.next()) |e| { + events[count] = e; + count += 1; + } + + try testing.expectEqual(@as(usize, 3), count); + try testing.expect(events[0] == .slot and events[0].slot == 0); + try testing.expect(events[1] == .slot and events[1].slot == 1); + try testing.expect(events[2] == .slot and events[2].slot == 2); +} + +test "advanceTo already at target returns nothing" { + var fake = time_source.FakeTime{ .ms = 112_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + + var count: usize = 0; + var iter = clock.advanceTo(1); + while (iter.next()) |_| count += 1; + try testing.expectEqual(@as(usize, 0), count); +} + +test "gossip disparity: far from boundary" { + var fake = time_source.FakeTime{ .ms = 103_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 0), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(0)); + try testing.expect(!clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "gossip disparity: near next slot boundary" { + var fake = time_source.FakeTime{ .ms = 111_600 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 1), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "gossip disparity: just after slot boundary" { + var fake = time_source.FakeTime{ .ms = 112_300 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(0)); +} + +test "gossip disparity: exact threshold (500ms) applies inclusively" { + // next_slot_ms - now_ms == 500 → 500 <= 500, disparity applies. + // Slot 1 starts at 112_000ms. 500ms before = 111_500ms. + var fake = time_source.FakeTime{ .ms = 111_500 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 1), clock.currentSlotWithGossipDisparity()); + try testing.expect(clock.isCurrentSlotGivenGossipDisparity(1)); + + // 1ms further out (111_499): 112_000 - 111_499 = 501 > 500, disparity does NOT apply. + fake.setMs(111_499); + clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(Slot, 0), clock.currentSlotWithGossipDisparity()); + try testing.expect(!clock.isCurrentSlotGivenGossipDisparity(1)); +} + +test "tolerance helpers" { + var fake = time_source.FakeTime{ .ms = 112_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?Slot, 2), clock.slotWithFutureTolerance(12_000)); + try testing.expectEqual(@as(?Slot, 0), clock.slotWithPastTolerance(12_000)); + try testing.expectEqual(@as(?Slot, null), clock.slotWithFutureTolerance(std.math.maxInt(u64))); + // Underflow (tolerance > now_ms) returns null, not 0 + try testing.expectEqual(@as(?Slot, null), clock.slotWithPastTolerance(112_001)); +} + +test "secFromSlot and msFromSlot" { + var fake = time_source.FakeTime{ .ms = 118_000 }; + var clock = try SlotClock.init(test_cfg, .{ .fake = &fake }); + try testing.expectEqual(@as(?i64, 6), clock.secFromSlot(1, null)); + try testing.expectEqual(@as(?i64, 6000), clock.msFromSlot(1, null)); + try testing.expectEqual(@as(?i64, 0), clock.secFromSlot(1, 112)); +} diff --git a/src/clock/root.zig b/src/clock/root.zig new file mode 100644 index 000000000..f38f7e234 --- /dev/null +++ b/src/clock/root.zig @@ -0,0 +1,25 @@ +//! Zig beacon clock – slot/epoch timing for Ethereum consensus. +//! +//! Three-layer architecture: +//! Layer 0 (`slot_math`) – pure arithmetic, comptime-compatible +//! Layer 1 (`SlotClock`) – stateful clock with time source +//! Layer 2 (`EventClock`) – async event loop with listeners and waiters + +pub const slot_math = @import("slot_math.zig"); +pub const time_source = @import("time_source.zig"); +pub const SlotClock = @import("SlotClock.zig"); +pub const EventClock = @import("EventClock.zig"); + +pub const Config = SlotClock.Config; +pub const Slot = SlotClock.Slot; +pub const Epoch = SlotClock.Epoch; + +pub const ListenerId = EventClock.ListenerId; +pub const Error = EventClock.Error; + +test { + _ = slot_math; + _ = time_source; + _ = SlotClock; + _ = EventClock; +} diff --git a/src/clock/slot_math.zig b/src/clock/slot_math.zig new file mode 100644 index 000000000..ab8ed6b54 --- /dev/null +++ b/src/clock/slot_math.zig @@ -0,0 +1,386 @@ +//! Layer 0 – Pure slot/epoch arithmetic. +//! +//! No state, no allocation, no I/O. Every function is comptime-compatible. +//! All overflow paths return `null` (`?T`) instead of panicking. + +const std = @import("std"); +const ct = @import("consensus_types"); +const bounded_array = @import("bounded_array"); + +pub const Slot = ct.primitive.Slot.Type; +pub const Epoch = ct.primitive.Epoch.Type; + +pub const DurationTransition = struct { + from_slot: Slot, + new_duration_ms: u64, +}; + +pub const max_duration_transitions: u32 = 4; + +pub const DurationTransitions = bounded_array.BoundedArray(DurationTransition, max_duration_transitions); + +/// Comptime builder for `Config.duration_transitions`. +pub fn forkTransitions( + comptime list: []const DurationTransition, +) DurationTransitions { + if (list.len > max_duration_transitions) { + @compileError("too many slot duration transitions"); + } + var arr: DurationTransitions = .{}; + inline for (list) |t| arr.push(t); + return arr; +} + +/// `duration_transitions` entries must be sorted strictly ascending by +/// `from_slot`, with non-zero `new_duration_ms` and `from_slot != 0` (validated). +pub const Config = struct { + genesis_time_sec: u64, + slot_duration_ms: u64, + duration_transitions: DurationTransitions = .{}, + slots_per_epoch: u64, + maximum_gossip_clock_disparity_ms: u64 = 500, + + pub fn validate(self: Config) error{InvalidConfig}!void { + if (self.slot_duration_ms == 0) return error.InvalidConfig; + if (self.slots_per_epoch == 0) return error.InvalidConfig; + if (secToMs(self.genesis_time_sec) == null) return error.InvalidConfig; + var prev_slot: Slot = 0; + for (self.duration_transitions.constSlice()) |t| { + if (t.from_slot == 0) return error.InvalidConfig; + if (t.new_duration_ms == 0) return error.InvalidConfig; + if (t.from_slot <= prev_slot) return error.InvalidConfig; + prev_slot = t.from_slot; + } + } + + pub fn transitions(self: *const Config) []const DurationTransition { + return self.duration_transitions.constSlice(); + } + + /// Slot duration applicable at `slot`. Falls back to `slot_duration_ms` + /// if no transition has fired yet. + pub fn slotDurationMsAt(self: Config, slot: Slot) u64 { + var duration = self.slot_duration_ms; + for (self.transitions()) |t| { + if (t.from_slot > slot) break; + duration = t.new_duration_ms; + } + return duration; + } +}; + +/// Returns the slot at the given Unix-millisecond timestamp, +/// or null if pre-genesis or on overflow. +/// Precondition: `validate()` accepted `config` — guarantees all durations > 0. +pub fn slotAtMs(config: Config, now_ms: u64) ?Slot { + std.debug.assert(config.slot_duration_ms != 0); + const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; + if (now_ms < genesis_ms) return null; + + var seg_start_slot: Slot = 0; + var seg_start_ms: u64 = genesis_ms; + var seg_duration: u64 = config.slot_duration_ms; + + for (config.transitions()) |t| { + const seg_slots = t.from_slot - seg_start_slot; + const seg_ms_total = std.math.mul(u64, seg_slots, seg_duration) catch { + // Segment exceeds u64 ms — `now_ms` must lie inside it. + return seg_start_slot + (now_ms - seg_start_ms) / seg_duration; + }; + if (now_ms - seg_start_ms < seg_ms_total) { + return seg_start_slot + (now_ms - seg_start_ms) / seg_duration; + } + seg_start_ms = std.math.add(u64, seg_start_ms, seg_ms_total) catch return null; + seg_start_slot = t.from_slot; + seg_duration = t.new_duration_ms; + } + + return seg_start_slot + (now_ms - seg_start_ms) / seg_duration; +} + +/// Returns the slot at the given Unix-second timestamp, +/// or null if pre-genesis or on overflow. +pub fn slotAtSec(config: Config, now_sec: u64) ?Slot { + const now_ms = secToMs(now_sec) orelse return null; + return slotAtMs(config, now_ms); +} + +/// Returns the epoch that contains `slot`. +/// Precondition: `validate()` accepted `config` — `slots_per_epoch > 0`. +pub fn epochAtSlot(config: Config, slot: Slot) Epoch { + std.debug.assert(config.slots_per_epoch != 0); + return @divFloor(slot, config.slots_per_epoch); +} + +/// Returns the Unix-millisecond start time of `slot`, or null on overflow. +pub fn slotStartMs(config: Config, slot: Slot) ?u64 { + const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; + + var seg_start_slot: Slot = 0; + var seg_start_ms: u64 = genesis_ms; + var seg_duration: u64 = config.slot_duration_ms; + + for (config.transitions()) |t| { + if (slot < t.from_slot) { + const offset = std.math.mul(u64, slot - seg_start_slot, seg_duration) catch return null; + return std.math.add(u64, seg_start_ms, offset) catch null; + } + const seg_slots = t.from_slot - seg_start_slot; + const seg_ms_total = std.math.mul(u64, seg_slots, seg_duration) catch return null; + seg_start_ms = std.math.add(u64, seg_start_ms, seg_ms_total) catch return null; + seg_start_slot = t.from_slot; + seg_duration = t.new_duration_ms; + } + + const offset = std.math.mul(u64, slot - seg_start_slot, seg_duration) catch return null; + return std.math.add(u64, seg_start_ms, offset) catch null; +} + +/// Returns the Unix-second start time of `slot`, or null on overflow. +/// Sub-second slot durations truncate to the floor second. +pub fn slotStartSec(config: Config, slot: Slot) ?u64 { + const ms = slotStartMs(config, slot) orelse return null; + return @divFloor(ms, 1000); +} + +/// Milliseconds until the next slot boundary. +/// Pre-genesis: returns the time until genesis. +/// Returns null only on arithmetic overflow. +pub fn msUntilNextSlot(config: Config, now_ms: u64) ?u64 { + const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; + if (now_ms < genesis_ms) return genesis_ms - now_ms; + const slot = slotAtMs(config, now_ms) orelse return null; + // `slot + 1` wraps at u64.maxInt — return null instead of panicking. + const next_slot = std.math.add(u64, slot, 1) catch return null; + const next_start = slotStartMs(config, next_slot) orelse return null; + return next_start - now_ms; +} + +fn secToMs(sec: u64) ?u64 { + return std.math.mul(u64, sec, 1000) catch return null; +} + +const testing = std.testing; + +const mainnet = Config{ + .genesis_time_sec = 1_606_824_023, + .slot_duration_ms = 12_000, + .slots_per_epoch = 32, +}; + +test "basic slot math" { + // slotAtSec: genesis is slot 0 + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec)); + try testing.expectEqual(@as(?Slot, 1), slotAtSec(mainnet, mainnet.genesis_time_sec + 12)); + try testing.expectEqual(@as(?Slot, 2), slotAtSec(mainnet, mainnet.genesis_time_sec + 24)); + + const genesis_ms = mainnet.genesis_time_sec * 1000; + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 12_000)); + + try testing.expectEqual(@as(Epoch, 0), epochAtSlot(mainnet, 0)); + try testing.expectEqual(@as(Epoch, 0), epochAtSlot(mainnet, 31)); + try testing.expectEqual(@as(Epoch, 1), epochAtSlot(mainnet, 32)); + try testing.expectEqual(@as(Epoch, 1), epochAtSlot(mainnet, 63)); + try testing.expectEqual(@as(Epoch, 2), epochAtSlot(mainnet, 64)); + + try testing.expectEqual(@as(?u64, mainnet.genesis_time_sec), slotStartSec(mainnet, 0)); + try testing.expectEqual(@as(?u64, mainnet.genesis_time_sec + 12), slotStartSec(mainnet, 1)); + try testing.expectEqual(@as(?u64, mainnet.genesis_time_sec + 24), slotStartSec(mainnet, 2)); + + try testing.expectEqual(@as(?u64, mainnet.genesis_time_sec * 1000), slotStartMs(mainnet, 0)); + try testing.expectEqual(@as(?u64, (mainnet.genesis_time_sec + 12) * 1000), slotStartMs(mainnet, 1)); + + try testing.expectEqual(@as(u64, 12_000), mainnet.slotDurationMsAt(0)); + try testing.expectEqual(@as(u64, 12_000), mainnet.slotDurationMsAt(1_000_000)); +} + +test "within-slot timing" { + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 0)); + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 6)); + try testing.expectEqual(@as(?Slot, 0), slotAtSec(mainnet, mainnet.genesis_time_sec + 11)); + try testing.expectEqual(@as(?Slot, 1), slotAtSec(mainnet, mainnet.genesis_time_sec + 12)); + + const genesis_ms = mainnet.genesis_time_sec * 1000; + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 1)); + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 6_000)); + try testing.expectEqual(@as(?Slot, 0), slotAtMs(mainnet, genesis_ms + 11_999)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 12_000)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 18_000)); + try testing.expectEqual(@as(?Slot, 1), slotAtMs(mainnet, genesis_ms + 23_999)); + try testing.expectEqual(@as(?Slot, 2), slotAtMs(mainnet, genesis_ms + 24_000)); +} + +test "overflow safety" { + try testing.expectEqual(@as(?Slot, null), slotAtSec(mainnet, mainnet.genesis_time_sec - 1)); + try testing.expectEqual(@as(?Slot, null), slotAtSec(mainnet, 0)); + try testing.expectEqual(@as(?Slot, null), slotAtMs(mainnet, 0)); + + try testing.expectEqual(@as(?u64, null), slotStartSec(mainnet, std.math.maxInt(u64))); + try testing.expectEqual(@as(?u64, null), slotStartMs(mainnet, std.math.maxInt(u64))); + + const extreme = Config{ + .genesis_time_sec = std.math.maxInt(u64), + .slot_duration_ms = 12_000, + .slots_per_epoch = 32, + }; + try testing.expectEqual(@as(?Slot, null), slotAtMs(extreme, 0)); + try testing.expectEqual(@as(?u64, null), slotStartSec(extreme, 1)); + try testing.expectEqual(@as(?u64, null), slotStartMs(extreme, 0)); + + try testing.expectEqual(@as(?u64, null), msUntilNextSlot(extreme, 0)); +} + +test "msUntilNextSlot" { + const genesis_ms = mainnet.genesis_time_sec * 1000; + const slot_ms: u64 = 12_000; + + try testing.expectEqual(@as(?u64, slot_ms), msUntilNextSlot(mainnet, genesis_ms)); + try testing.expectEqual(@as(?u64, slot_ms - 1), msUntilNextSlot(mainnet, genesis_ms + 1)); + try testing.expectEqual(@as(?u64, slot_ms - 6_000), msUntilNextSlot(mainnet, genesis_ms + 6_000)); + try testing.expectEqual(@as(?u64, 1), msUntilNextSlot(mainnet, genesis_ms + slot_ms - 1)); + try testing.expectEqual(@as(?u64, slot_ms), msUntilNextSlot(mainnet, genesis_ms + slot_ms)); + try testing.expectEqual(@as(?u64, 1_000), msUntilNextSlot(mainnet, genesis_ms - 1_000)); + try testing.expectEqual(@as(?u64, genesis_ms), msUntilNextSlot(mainnet, 0)); + + // Regression: `slot + 1` overflows at max slot — must return null, not panic. + const tight = Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 1, + .slots_per_epoch = 32, + }; + try testing.expectEqual(@as(?u64, null), msUntilNextSlot(tight, std.math.maxInt(u64))); +} + +test "config validate" { + try mainnet.validate(); + + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 0, + .slots_per_epoch = 32, + }).validate()); + + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 12_000, + .slots_per_epoch = 0, + }).validate()); + + try testing.expectEqual(@as(u64, 500), mainnet.maximum_gossip_clock_disparity_ms); + + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = std.math.maxInt(u64), + .slot_duration_ms = 12_000, + .slots_per_epoch = 32, + }).validate()); + + // Zero new_duration_ms in any transition is invalid + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 12_000, + .duration_transitions = forkTransitions(&.{.{ .from_slot = 1024, .new_duration_ms = 0 }}), + .slots_per_epoch = 32, + }).validate()); + + // Transitions must be sorted strictly ascending + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 12_000, + .duration_transitions = forkTransitions(&.{ + .{ .from_slot = 2048, .new_duration_ms = 6_000 }, + .{ .from_slot = 1024, .new_duration_ms = 4_000 }, + }), + .slots_per_epoch = 32, + }).validate()); + + // from_slot == 0 is invalid (a transition at genesis is redundant with slot_duration_ms). + var bad_zero: DurationTransitions = .{}; + bad_zero.push(.{ .from_slot = 0, .new_duration_ms = 6_000 }); + try testing.expectError(error.InvalidConfig, (Config{ + .genesis_time_sec = 0, + .slot_duration_ms = 12_000, + .duration_transitions = bad_zero, + .slots_per_epoch = 32, + }).validate()); +} + +const eip7782 = Config{ + .genesis_time_sec = 1_000_000, + .slot_duration_ms = 12_000, + .duration_transitions = forkTransitions(&.{.{ .from_slot = 1024, .new_duration_ms = 6_000 }}), + .slots_per_epoch = 32, +}; + +test "fork-aware: slotDurationMsAt" { + try testing.expectEqual(@as(u64, 12_000), eip7782.slotDurationMsAt(0)); + try testing.expectEqual(@as(u64, 12_000), eip7782.slotDurationMsAt(1023)); + try testing.expectEqual(@as(u64, 6_000), eip7782.slotDurationMsAt(1024)); + try testing.expectEqual(@as(u64, 6_000), eip7782.slotDurationMsAt(2048)); +} + +test "fork-aware: slotStartMs at and across the boundary" { + const genesis_ms = eip7782.genesis_time_sec * 1000; + + try testing.expectEqual(@as(?u64, genesis_ms), slotStartMs(eip7782, 0)); + try testing.expectEqual(@as(?u64, genesis_ms + 12_000), slotStartMs(eip7782, 1)); + + const fork_ms = genesis_ms + 1024 * 12_000; + try testing.expectEqual(@as(?u64, fork_ms), slotStartMs(eip7782, 1024)); + + try testing.expectEqual(@as(?u64, fork_ms + 6_000), slotStartMs(eip7782, 1025)); + try testing.expectEqual(@as(?u64, fork_ms + 6_000 * 100), slotStartMs(eip7782, 1124)); +} + +test "fork-aware: slotAtMs across boundary" { + const genesis_ms = eip7782.genesis_time_sec * 1000; + const fork_ms = genesis_ms + 1024 * 12_000; + + try testing.expectEqual(@as(?Slot, 1023), slotAtMs(eip7782, fork_ms - 12_000)); + try testing.expectEqual(@as(?Slot, 1023), slotAtMs(eip7782, fork_ms - 1)); + try testing.expectEqual(@as(?Slot, 1024), slotAtMs(eip7782, fork_ms)); + try testing.expectEqual(@as(?Slot, 1024), slotAtMs(eip7782, fork_ms + 5_999)); + try testing.expectEqual(@as(?Slot, 1025), slotAtMs(eip7782, fork_ms + 6_000)); + try testing.expectEqual(@as(?Slot, 1026), slotAtMs(eip7782, fork_ms + 12_000)); +} + +test "fork-aware: msUntilNextSlot across boundary" { + const genesis_ms = eip7782.genesis_time_sec * 1000; + const fork_ms = genesis_ms + 1024 * 12_000; + + try testing.expectEqual(@as(?u64, 1), msUntilNextSlot(eip7782, fork_ms - 1)); + try testing.expectEqual(@as(?u64, 6_000), msUntilNextSlot(eip7782, fork_ms)); + try testing.expectEqual(@as(?u64, 3_000), msUntilNextSlot(eip7782, fork_ms + 3_000)); +} + +const two_fork = Config{ + .genesis_time_sec = 1_000_000, + .slot_duration_ms = 12_000, + .duration_transitions = forkTransitions(&.{ + .{ .from_slot = 1024, .new_duration_ms = 6_000 }, + .{ .from_slot = 8192, .new_duration_ms = 4_000 }, + }), + .slots_per_epoch = 32, +}; + +test "fork-aware: two transitions" { + const genesis_ms = two_fork.genesis_time_sec * 1000; + const f1_ms = genesis_ms + 1024 * 12_000; // first fork boundary + // Slots 1024..8191 are 6s each → 7168 slots × 6_000 ms + const f2_ms = f1_ms + (8192 - 1024) * 6_000; // second fork boundary + + try testing.expectEqual(@as(u64, 12_000), two_fork.slotDurationMsAt(0)); + try testing.expectEqual(@as(u64, 6_000), two_fork.slotDurationMsAt(1024)); + try testing.expectEqual(@as(u64, 4_000), two_fork.slotDurationMsAt(8192)); + + // slotStartMs across both boundaries + try testing.expectEqual(@as(?u64, f1_ms), slotStartMs(two_fork, 1024)); + try testing.expectEqual(@as(?u64, f2_ms), slotStartMs(two_fork, 8192)); + try testing.expectEqual(@as(?u64, f2_ms + 4_000), slotStartMs(two_fork, 8193)); + + // slotAtMs across both boundaries + try testing.expectEqual(@as(?Slot, 1024), slotAtMs(two_fork, f1_ms)); + try testing.expectEqual(@as(?Slot, 8191), slotAtMs(two_fork, f2_ms - 1)); + try testing.expectEqual(@as(?Slot, 8192), slotAtMs(two_fork, f2_ms)); + try testing.expectEqual(@as(?Slot, 8193), slotAtMs(two_fork, f2_ms + 4_000)); +} diff --git a/src/clock/time_source.zig b/src/clock/time_source.zig new file mode 100644 index 000000000..501be9b31 --- /dev/null +++ b/src/clock/time_source.zig @@ -0,0 +1,76 @@ +//! Pluggable time source abstraction. +//! +//! Tagged union with two variants: +//! `.io` – production: reads wall-clock time from `std.Io` +//! `.fake` – testing: reads from a mutable `FakeTime` struct + +const std = @import("std"); +const slot_math = @import("slot_math.zig"); + +/// Production clock backed by std.Io wall-clock time. +pub const RealClock = struct { + io: std.Io, + + pub fn nowMs(self: RealClock) u64 { + const ms = std.Io.Clock.real.now(self.io).toMilliseconds(); + std.debug.assert(ms >= 0); + return @intCast(ms); + } +}; + +/// Controllable time source for deterministic testing. +pub const FakeTime = struct { + ms: u64 = 0, + + pub fn setMs(self: *FakeTime, ms: u64) void { + self.ms = ms; + } + + pub fn advanceMs(self: *FakeTime, delta: u64) void { + self.ms += delta; + } + + /// Advance time by exactly one slot duration. Uses the duration that + /// applies at the slot containing the current time; pre-genesis falls + /// back to the genesis (pre-fork) duration. + pub fn advanceSlot(self: *FakeTime, config: slot_math.Config) void { + const slot = slot_math.slotAtMs(config, self.ms) orelse 0; + self.ms += config.slotDurationMsAt(slot); + } +}; + +pub const TimeSource = union(enum) { + real: RealClock, + fake: *FakeTime, + + pub fn nowMs(self: TimeSource) u64 { + return switch (self) { + .real => |c| c.nowMs(), + .fake => |f| f.ms, + }; + } +}; + +const testing = std.testing; + +test "FakeTime.advanceSlot uses fork-aware duration" { + const cfg = slot_math.Config{ + .genesis_time_sec = 1_000, + .slot_duration_ms = 12_000, + .duration_transitions = slot_math.forkTransitions(&.{ + .{ .from_slot = 2, .new_duration_ms = 6_000 }, + }), + .slots_per_epoch = 32, + }; + + var fake = FakeTime{ .ms = cfg.genesis_time_sec * 1000 }; + // Slot 0 → uses 12_000 ms + fake.advanceSlot(cfg); + try testing.expectEqual(@as(u64, 1_012_000), fake.ms); + // Slot 1 → still 12_000 ms (transition is at slot 2) + fake.advanceSlot(cfg); + try testing.expectEqual(@as(u64, 1_024_000), fake.ms); + // Slot 2 → first post-fork slot, 6_000 ms + fake.advanceSlot(cfg); + try testing.expectEqual(@as(u64, 1_030_000), fake.ms); +}