feat(clock): add clock module for slot/epoch timing#354
Conversation
Three-layer beacon clock ported from ChainSafe/clock-zig, rebased onto current main: - Layer 0 (`slot_math`) — pure arithmetic, comptime-compatible - Layer 1 (`SlotClock`) — stateful clock with pluggable `TimeSource` - Layer 2 (`EventClock`) — async event loop with listeners and waiters Hooked into zbuild via `.modules.clock` + `.tests.clock`. Self-contained (no internal lodestar-z deps), so the module entry needs no imports. Module is `std.Io`-generic. Tests exercise it through `std.Io.Threaded` to avoid Zig 0.16.0 stdlib bugs in `Io.Dispatch` (macOS) and `Io.Uring` (Linux). Concurrency: - `mutex: std.Io.Mutex` covers all mutable shared state (waiters, listeners, snapshots, `next_listener_id`, `clock.current_slot`). - `stopped: std.atomic.Value(bool)` so `runAutoLoop` can read it lock-free between sleeps. - Cancelable `lock(io)` for normal paths so `error.Canceled` propagates naturally to callers' next cancelation point; `lockUncancelable` only on cleanup paths (`WaitForSlotResult.cancel`, `abortAllWaiters`). - Listener callbacks run while the mutex is held — they must not call back into EventClock methods (documented on `advanceAndDispatch`). `WaitForSlotResult` is a tagged union (`immediate` / `pending`) — no peeking into `std.Io.Future` internal fields. Replaces the long-running 0.16-migration branch which had drifted 40 commits behind main; only the clock-specific work is preserved here on top of current main (zbuild-based build).
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates a new, highly modular clock system into the project, crucial for precise slot and epoch timing in a Zig 0.16 environment. The design emphasizes a clear separation of concerns, from fundamental time arithmetic to event-driven asynchronous operations, enhancing both maintainability and testability. By streamlining the implementation and removing previous migration overhead, this change provides a clean and efficient foundation for time-sensitive blockchain operations. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a three-layer beacon clock system for Ethereum consensus, comprising pure arithmetic, stateful tracking, and an async event-driven clock. A critical deadlock risk was identified in EventClock due to holding a mutex while invoking external listener callbacks. Additionally, the implementation violates repository style guides regarding the required minimum assertion density and the necessity of fixed upper bounds for all queues and lists. A minor improvement for more idiomatic error handling was also suggested.
| fn advanceAndDispatch(self: *EventClock, target: Slot) std.Io.Cancelable!void { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| var iter = self.clock.advanceTo(target); | ||
| while (iter.next()) |event| { | ||
| if (self.stopped.load(.acquire)) break; | ||
| switch (event) { | ||
| .slot => |s| { | ||
| self.snapshotSlotListenersLocked(); | ||
| self.dispatchWaitersLocked(s); | ||
| for (self.slot_snapshot.items) |listener| { | ||
| listener.callback(listener.ctx, s); | ||
| } | ||
| }, | ||
| .epoch => |e| { | ||
| self.snapshotEpochListenersLocked(); | ||
| for (self.epoch_snapshot.items) |listener| { | ||
| listener.callback(listener.ctx, e); | ||
| } | ||
| }, | ||
| } | ||
| } | ||
| // Defensive: handles edge cases where advanceTo yields zero events | ||
| // (already at target) but waiters were added between loop ticks. | ||
| // In the normal case, this is a no-op because the last .slot event | ||
| // already dispatched waiters at the same slot value. | ||
| self.dispatchWaitersLocked(self.clock.current_slot); | ||
| } |
There was a problem hiding this comment.
Holding the mutex while invoking external listener callbacks (lines 469 and 475) creates a significant deadlock risk. If a callback attempts to call any EventClock method that triggers a catchUp() (like currentSlot()), it will attempt to re-acquire the same mutex on the same thread/fiber, leading to a deadlock. This is a violation of the 'Safety' design goal. Consider collecting events and their snapshots into a local buffer while holding the lock, then releasing the lock before iterating and invoking the callbacks.
References
- Use only very simple, explicit control flow for clarity. Assertions downgrade catastrophic correctness bugs into liveness bugs. (link)
| //! Layer 0 – Pure slot/epoch arithmetic. | ||
| //! | ||
| //! No state, no allocation, no I/O. Every function is comptime-compatible. | ||
| //! All overflow paths return `null` (`?T`) instead of panicking. | ||
|
|
||
| const std = @import("std"); | ||
|
|
||
| // ── Type aliases ────────────────────────────────────────────────────── | ||
|
|
||
| pub const Slot = u64; | ||
| pub const Epoch = u64; | ||
| pub const UnixMs = u64; | ||
| pub const UnixSec = u64; | ||
|
|
||
| // ── Config ──────────────────────────────────────────────────────────── | ||
|
|
||
| pub const Config = struct { | ||
| genesis_time_sec: UnixSec, | ||
| seconds_per_slot: u64, | ||
| slots_per_epoch: u64, | ||
| maximum_gossip_clock_disparity_ms: u64 = 500, | ||
|
|
||
| /// Validates that the config is usable (no zero divisors, no sec→ms overflow). | ||
| pub fn validate(self: Config) error{InvalidConfig}!void { | ||
| if (self.seconds_per_slot == 0) return error.InvalidConfig; | ||
| if (self.slots_per_epoch == 0) return error.InvalidConfig; | ||
| // Ensure sec→ms conversions used by msUntilNextSlot won't overflow at runtime. | ||
| if (secToMs(self.genesis_time_sec) == null) return error.InvalidConfig; | ||
| if (secToMs(self.seconds_per_slot) == null) return error.InvalidConfig; | ||
| } | ||
|
|
||
| /// Returns the slot duration in milliseconds, or null on overflow. | ||
| pub fn slotDurationMs(self: Config) ?u64 { | ||
| return secToMs(self.seconds_per_slot); | ||
| } | ||
| }; | ||
|
|
||
| /// Returns the slot at the given Unix-millisecond timestamp, | ||
| /// or null if pre-genesis or on overflow. | ||
| pub fn slotAtMs(config: Config, now_ms: UnixMs) ?Slot { | ||
| const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; | ||
| if (now_ms < genesis_ms) return null; | ||
| const slot_ms = secToMs(config.seconds_per_slot) orelse return null; | ||
| if (slot_ms == 0) return null; | ||
| return @divFloor(now_ms - genesis_ms, slot_ms); | ||
| } | ||
|
|
||
| /// Returns the slot at the given Unix-second timestamp, | ||
| /// or null if pre-genesis. | ||
| pub fn slotAtSec(config: Config, now_sec: UnixSec) ?Slot { | ||
| if (now_sec < config.genesis_time_sec) return null; | ||
| if (config.seconds_per_slot == 0) return null; | ||
| return @divFloor(now_sec - config.genesis_time_sec, config.seconds_per_slot); | ||
| } | ||
|
|
||
| /// Returns the epoch that contains `slot`, or null if slots_per_epoch is zero. | ||
| pub fn epochAtSlot(config: Config, slot: Slot) ?Epoch { | ||
| if (config.slots_per_epoch == 0) return null; | ||
| return @divFloor(slot, config.slots_per_epoch); | ||
| } | ||
|
|
||
| /// Returns the Unix-second start time of `slot`, or null on overflow. | ||
| pub fn slotStartSec(config: Config, slot: Slot) ?UnixSec { | ||
| const offset = std.math.mul(u64, slot, config.seconds_per_slot) catch return null; | ||
| return std.math.add(u64, config.genesis_time_sec, offset) catch return null; | ||
| } | ||
|
|
||
| /// Returns the Unix-millisecond start time of `slot`, or null on overflow. | ||
| pub fn slotStartMs(config: Config, slot: Slot) ?UnixMs { | ||
| const sec = slotStartSec(config, slot) orelse return null; | ||
| return secToMs(sec); | ||
| } | ||
|
|
||
| /// Milliseconds until the next slot boundary. | ||
| /// Pre-genesis: returns the time until genesis. | ||
| /// Returns null only on arithmetic overflow. | ||
| pub fn msUntilNextSlot(config: Config, now_ms: UnixMs) ?u64 { | ||
| const genesis_ms = secToMs(config.genesis_time_sec) orelse return null; | ||
| const slot_ms = secToMs(config.seconds_per_slot) orelse return null; | ||
| if (slot_ms == 0) return null; | ||
|
|
||
| if (now_ms < genesis_ms) return genesis_ms - now_ms; | ||
|
|
||
| const delta = now_ms - genesis_ms; | ||
| const rem = delta % slot_ms; | ||
| if (rem == 0) return slot_ms; | ||
| return slot_ms - rem; | ||
| } | ||
|
|
||
| fn secToMs(sec: u64) ?u64 { | ||
| return std.math.mul(u64, sec, 1000) catch return null; | ||
| } |
There was a problem hiding this comment.
The repository style guide requires an average assertion density of at least two assertions per function (Line 54). This file contains several pure arithmetic functions with zero assertions. Adding assertions for invariants (e.g., assert(config.seconds_per_slot > 0) in slotAtMs) would improve safety and documentation by encoding the mental model of the code as per the 'Safety' design goals.
References
- The assertion density of the code must average a minimum of two assertions per function. (link)
| //! Layer 1 – Stateful slot clock. | ||
| //! | ||
| //! Wraps `slot_math` with a `TimeSource` and a cached `current_slot`. | ||
| //! Pure-read helpers query wall-clock time; only `advanceTo()` mutates the cache. | ||
|
|
||
| const std = @import("std"); | ||
| const slot_math = @import("slot_math.zig"); | ||
| const time_source = @import("time_source.zig"); | ||
|
|
||
| const SlotClock = @This(); | ||
|
|
||
| pub const Slot = slot_math.Slot; | ||
| pub const Epoch = slot_math.Epoch; | ||
| pub const Config = slot_math.Config; | ||
| pub const TimeSource = time_source.TimeSource; | ||
|
|
||
| pub const Event = union(enum) { | ||
| slot: Slot, | ||
| epoch: Epoch, | ||
| }; | ||
|
|
||
| pub const AdvanceIterator = struct { | ||
| clock: *SlotClock, | ||
| target: Slot, | ||
| pending_epoch: ?Epoch = null, | ||
|
|
||
| /// Advances the clock one step at a time, yielding slot and epoch events. | ||
| /// For each slot advancement: yields .slot first, then .epoch if an epoch boundary was crossed. | ||
| /// Returns null when caught up to target. | ||
| pub fn next(self: *AdvanceIterator) ?Event { | ||
| // If we have a pending epoch event from the previous step, emit it now | ||
| if (self.pending_epoch) |epoch| { | ||
| self.pending_epoch = null; | ||
| return .{ .epoch = epoch }; | ||
| } | ||
|
|
||
| const current = self.clock.current_slot; | ||
|
|
||
| // Genesis case: current_slot is null, advance to slot 0 | ||
| if (current == null) { | ||
| self.clock.current_slot = 0; | ||
| return .{ .slot = 0 }; | ||
| } | ||
|
|
||
| const cur = current.?; | ||
| if (cur >= self.target) return null; | ||
| if (cur == std.math.maxInt(Slot)) return null; | ||
|
|
||
| const next_slot = cur + 1; | ||
| self.clock.current_slot = next_slot; | ||
|
|
||
| // Check epoch boundary — epochAtSlot returns ?Epoch | ||
| const prev_epoch = slot_math.epochAtSlot(self.clock.config, cur); | ||
| const new_epoch = slot_math.epochAtSlot(self.clock.config, next_slot); | ||
| if (prev_epoch) |prev_ep| { | ||
| if (new_epoch) |new_ep| { | ||
| if (prev_ep < new_ep) { | ||
| self.pending_epoch = new_ep; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return .{ .slot = next_slot }; | ||
| } | ||
| }; | ||
|
|
||
| config: Config, | ||
| time: TimeSource, | ||
| current_slot: ?Slot = null, | ||
|
|
||
| pub fn init(config: Config, time: TimeSource) error{InvalidConfig}!SlotClock { | ||
| try config.validate(); | ||
| var self = SlotClock{ | ||
| .config = config, | ||
| .time = time, | ||
| }; | ||
| self.current_slot = slot_math.slotAtMs(config, time.nowMs()); | ||
| return self; | ||
| } | ||
|
|
||
| /// Returns the current wall-clock slot. Pure read — does NOT update | ||
| /// the internal `current_slot` cache. Only `advanceTo()` advances the cache. | ||
| pub fn currentSlot(self: *const SlotClock) ?Slot { | ||
| const now_ms = self.time.nowMs(); | ||
| return slot_math.slotAtMs(self.config, now_ms); | ||
| } | ||
|
|
||
| pub fn currentEpoch(self: *const SlotClock) ?Epoch { | ||
| const slot = self.currentSlot() orelse return null; | ||
| return slot_math.epochAtSlot(self.config, slot); | ||
| } | ||
|
|
||
| pub fn currentSlotOrGenesis(self: *const SlotClock) Slot { | ||
| return self.currentSlot() orelse 0; | ||
| } | ||
|
|
||
| pub fn currentEpochOrGenesis(self: *const SlotClock) Epoch { | ||
| return self.currentEpoch() orelse 0; | ||
| } | ||
|
|
||
| pub fn currentSlotWithGossipDisparity(self: *const SlotClock) Slot { | ||
| const current = self.currentSlotOrGenesis(); | ||
| if (current == std.math.maxInt(Slot)) return current; | ||
| const now_ms = self.time.nowMs(); | ||
| const next_slot = current + 1; | ||
| const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return current; | ||
| if (next_slot_ms -| now_ms < self.config.maximum_gossip_clock_disparity_ms) { | ||
| return next_slot; | ||
| } | ||
| return current; | ||
| } | ||
|
|
||
| pub fn isCurrentSlotGivenGossipDisparity(self: *const SlotClock, slot: Slot) bool { | ||
| const current = self.currentSlotOrGenesis(); | ||
| if (slot == current) return true; | ||
|
|
||
| const now_ms = self.time.nowMs(); | ||
|
|
||
| // Check if close to next slot | ||
| if (current != std.math.maxInt(Slot)) { | ||
| const next_slot = current + 1; | ||
| const next_slot_ms = slot_math.slotStartMs(self.config, next_slot) orelse return false; | ||
| if (next_slot_ms -| now_ms < self.config.maximum_gossip_clock_disparity_ms) { | ||
| return slot == next_slot; | ||
| } | ||
| } | ||
|
|
||
| // Check if just passed current slot boundary | ||
| if (current > 0) { | ||
| const current_slot_ms = slot_math.slotStartMs(self.config, current) orelse return false; | ||
| if (now_ms -| current_slot_ms < self.config.maximum_gossip_clock_disparity_ms) { | ||
| return slot == current - 1; | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| pub fn slotWithFutureTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { | ||
| const now_ms = self.time.nowMs(); | ||
| const shifted = @addWithOverflow(now_ms, tolerance_ms); | ||
| if (shifted[1] != 0) return null; | ||
| return slot_math.slotAtMs(self.config, shifted[0]); | ||
| } | ||
|
|
||
| pub fn slotWithPastTolerance(self: *const SlotClock, tolerance_ms: u64) ?Slot { | ||
| const now_ms = self.time.nowMs(); | ||
| // Checked sub: underflow (pre-UNIX-epoch) returns null. | ||
| // Pre-genesis but valid timestamp returns 0. | ||
| const shifted_ms = std.math.sub(u64, now_ms, tolerance_ms) catch return null; | ||
| return slot_math.slotAtMs(self.config, shifted_ms) orelse 0; | ||
| } | ||
|
|
||
| pub fn secFromSlot(self: *const SlotClock, slot: Slot, to_sec: ?slot_math.UnixSec) ?i64 { | ||
| const from_sec = slot_math.slotStartSec(self.config, slot) orelse return null; | ||
| const end_sec = to_sec orelse @divFloor(self.time.nowMs(), 1000); | ||
| const diff = @as(i128, @intCast(end_sec)) - @as(i128, @intCast(from_sec)); | ||
| if (diff < std.math.minInt(i64) or diff > std.math.maxInt(i64)) return null; | ||
| return @intCast(diff); | ||
| } | ||
|
|
||
| pub fn msFromSlot(self: *const SlotClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 { | ||
| const from_ms = slot_math.slotStartMs(self.config, slot) orelse return null; | ||
| const end_ms = to_ms orelse self.time.nowMs(); | ||
| const diff = @as(i128, @intCast(end_ms)) - @as(i128, @intCast(from_ms)); | ||
| if (diff < std.math.minInt(i64) or diff > std.math.maxInt(i64)) return null; | ||
| return @intCast(diff); | ||
| } | ||
|
|
||
| pub fn advanceTo(self: *SlotClock, target: Slot) AdvanceIterator { | ||
| return .{ | ||
| .clock = self, | ||
| .target = target, | ||
| }; | ||
| } |
There was a problem hiding this comment.
Similar to slot_math.zig, this file lacks the required assertion density. Functions like init, currentSlot, and advanceTo should include assertions to verify arguments and internal state invariants, adhering to the repository style guide (Line 54). For example, init could assert that the provided TimeSource is valid or that the initial current_slot calculation succeeded.
References
- The assertion density of the code must average a minimum of two assertions per function. (link)
| //! Layer 2 – Event-driven beacon clock. | ||
| //! | ||
| //! Combines `SlotClock` with an async I/O loop to emit slot/epoch events | ||
| //! and dispatch waiters. All public methods are safe to call from the | ||
| //! main thread; the internal loop runs as a single cooperative fiber. | ||
|
|
||
| const std = @import("std"); | ||
| const Allocator = std.mem.Allocator; | ||
| const slot_math = @import("slot_math.zig"); | ||
| const SlotClock = @import("SlotClock.zig"); | ||
| const time_source = @import("time_source.zig"); | ||
|
|
||
| const EventClock = @This(); | ||
|
|
||
| pub const Slot = slot_math.Slot; | ||
| pub const Epoch = slot_math.Epoch; | ||
| pub const Config = slot_math.Config; | ||
| pub const ListenerId = u64; | ||
| pub const TimeSource = time_source.TimeSource; | ||
|
|
||
| pub const Error = error{ | ||
| InvalidConfig, | ||
| OutOfMemory, | ||
| ListenerLimitReached, | ||
| Aborted, | ||
| Canceled, | ||
| }; | ||
|
|
||
| const WaitState = struct { | ||
| io: std.Io, | ||
| allocator: Allocator, | ||
| event: std.Io.Event = .unset, | ||
| aborted: bool = false, | ||
| }; | ||
|
|
||
| const WaiterEntry = struct { | ||
| target: Slot, | ||
| state: *WaitState, | ||
| }; | ||
|
|
||
| const SlotListenerEntry = struct { | ||
| id: ListenerId, | ||
| callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, | ||
| ctx: ?*anyopaque, | ||
| }; | ||
|
|
||
| const EpochListenerEntry = struct { | ||
| id: ListenerId, | ||
| callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, | ||
| ctx: ?*anyopaque, | ||
| }; | ||
|
|
||
| const SlotSnapshot = struct { | ||
| callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, | ||
| ctx: ?*anyopaque, | ||
| }; | ||
|
|
||
| const EpochSnapshot = struct { | ||
| callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, | ||
| ctx: ?*anyopaque, | ||
| }; | ||
|
|
||
| const WaiterQueue = std.PriorityQueue(WaiterEntry, void, struct { | ||
| fn compare(_: void, a: WaiterEntry, b: WaiterEntry) std.math.Order { | ||
| return std.math.order(a.target, b.target); | ||
| } | ||
| }.compare); | ||
|
|
||
| allocator: Allocator, | ||
| io: std.Io, | ||
| clock: SlotClock, | ||
|
|
||
| /// Coarse-grained mutex covering all mutable state below (waiters, | ||
| /// listeners, snapshots, next_listener_id, clock.current_slot via | ||
| /// advanceAndDispatch). Required when `io` is a multi-threaded backend | ||
| /// (`std.Io.Threaded`); cheap on a single-threaded fiber backend | ||
| /// (`std.Io.Evented`) — uncontended fast path is one atomic CAS. | ||
| /// | ||
| /// Listener callbacks run WHILE this mutex is held (see | ||
| /// `advanceAndDispatch`); they must not call back into `EventClock`. | ||
| mutex: std.Io.Mutex = .init, | ||
|
|
||
| /// Read lock-free from `runAutoLoop` so the loop's sleep does not need | ||
| /// to hold the mutex. | ||
| stopped: std.atomic.Value(bool) = .init(false), | ||
| loop_future: ?std.Io.Future(void) = null, | ||
|
|
||
| next_listener_id: ListenerId = 1, | ||
| slot_listeners: std.ArrayListUnmanaged(SlotListenerEntry) = .empty, | ||
| epoch_listeners: std.ArrayListUnmanaged(EpochListenerEntry) = .empty, | ||
| slot_snapshot: std.ArrayListUnmanaged(SlotSnapshot) = .empty, | ||
| epoch_snapshot: std.ArrayListUnmanaged(EpochSnapshot) = .empty, | ||
|
|
||
| waiters: WaiterQueue, | ||
|
|
||
| /// Initialise in-place. | ||
| pub fn init(self: *EventClock, allocator: Allocator, config: Config, io_handle: std.Io) Error!void { | ||
| self.* = .{ | ||
| .allocator = allocator, | ||
| .io = io_handle, | ||
| .clock = undefined, | ||
| .waiters = WaiterQueue.initContext({}), | ||
| }; | ||
| self.clock = SlotClock.init(config, .{ .real = .{ .io = io_handle } }) catch return error.InvalidConfig; | ||
| } | ||
|
|
||
| /// Start the auto-advance loop. Idempotent; second call is a no-op. | ||
| pub fn start(self: *EventClock) void { | ||
| if (self.loop_future != null) return; | ||
| self.loop_future = std.Io.async(self.io, EventClock.runAutoLoop, .{self}); | ||
| } | ||
|
|
||
| /// Signal the loop to stop and abort all pending waiters. Idempotent. | ||
| pub fn stop(self: *EventClock) void { | ||
| if (self.stopped.swap(true, .acq_rel)) return; | ||
| self.abortAllWaiters(); | ||
| } | ||
|
|
||
| /// Signal the loop to stop, cancel the fiber, and wait for it to finish. | ||
| pub fn join(self: *EventClock) void { | ||
| self.stop(); | ||
| var maybe_future = self.loop_future; | ||
| self.loop_future = null; | ||
| if (maybe_future) |*future| { | ||
| future.cancel(self.io); | ||
| future.await(self.io); | ||
| } | ||
| } | ||
|
|
||
| /// Release all resources. Calls `stop()` + `join()` internally. | ||
| pub fn deinit(self: *EventClock) void { | ||
| self.stop(); | ||
| self.join(); | ||
| self.slot_snapshot.deinit(self.allocator); | ||
| self.epoch_snapshot.deinit(self.allocator); | ||
| self.slot_listeners.deinit(self.allocator); | ||
| self.epoch_listeners.deinit(self.allocator); | ||
| self.waiters.deinit(self.allocator); | ||
| self.* = undefined; | ||
| } | ||
|
|
||
| // ── Listener API ── | ||
| // NOTE: Listeners should be registered before calling `start()`. | ||
| // Adding listeners from within a callback may silently skip the new listener | ||
| // until the next slot, because snapshot buffers are pre-allocated at registration | ||
| // time and the snapshot helpers only use pre-allocated capacity. | ||
|
|
||
| /// Register a slot listener. Returns an ID for later removal via `offSlot`. | ||
| pub fn onSlot( | ||
| self: *EventClock, | ||
| callback: *const fn (ctx: ?*anyopaque, slot: Slot) void, | ||
| ctx: ?*anyopaque, | ||
| ) Error!ListenerId { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| if (self.next_listener_id == std.math.maxInt(ListenerId)) return error.ListenerLimitReached; | ||
| // Pre-allocate snapshot buffer BEFORE appending the listener, so that | ||
| // if OOM occurs we haven't modified any state yet. | ||
| self.slot_snapshot.ensureTotalCapacity( | ||
| self.allocator, | ||
| self.slot_listeners.items.len + 1, | ||
| ) catch return error.OutOfMemory; | ||
| self.slot_listeners.append(self.allocator, .{ | ||
| .id = self.next_listener_id, | ||
| .callback = callback, | ||
| .ctx = ctx, | ||
| }) catch return error.OutOfMemory; | ||
| const id = self.next_listener_id; | ||
| self.next_listener_id += 1; | ||
| return id; | ||
| } | ||
|
|
||
| /// Unregister a slot listener. Returns `true` if found and removed. | ||
| pub fn offSlot(self: *EventClock, id: ListenerId) Error!bool { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| for (self.slot_listeners.items, 0..) |listener, i| { | ||
| if (listener.id == id) { | ||
| _ = self.slot_listeners.orderedRemove(i); | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /// Register an epoch listener. Returns an ID for later removal via `offEpoch`. | ||
| pub fn onEpoch( | ||
| self: *EventClock, | ||
| callback: *const fn (ctx: ?*anyopaque, epoch: Epoch) void, | ||
| ctx: ?*anyopaque, | ||
| ) Error!ListenerId { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| if (self.next_listener_id == std.math.maxInt(ListenerId)) return error.ListenerLimitReached; | ||
| // Pre-allocate snapshot buffer BEFORE appending the listener, so that | ||
| // if OOM occurs we haven't modified any state yet. | ||
| self.epoch_snapshot.ensureTotalCapacity( | ||
| self.allocator, | ||
| self.epoch_listeners.items.len + 1, | ||
| ) catch return error.OutOfMemory; | ||
| self.epoch_listeners.append(self.allocator, .{ | ||
| .id = self.next_listener_id, | ||
| .callback = callback, | ||
| .ctx = ctx, | ||
| }) catch return error.OutOfMemory; | ||
| const id = self.next_listener_id; | ||
| self.next_listener_id += 1; | ||
| return id; | ||
| } | ||
|
|
||
| /// Unregister an epoch listener. Returns `true` if found and removed. | ||
| pub fn offEpoch(self: *EventClock, id: ListenerId) Error!bool { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| for (self.epoch_listeners.items, 0..) |listener, i| { | ||
| if (listener.id == id) { | ||
| _ = self.epoch_listeners.orderedRemove(i); | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| // ── Delegated read APIs ── | ||
| // Every public accessor that exposes "current" slot/epoch state calls catchUp() | ||
| // first, matching the TS version where `get currentSlot()` triggers event | ||
| // emission before returning. Pure time-arithmetic helpers (slotWithFutureTolerance, | ||
| // secFromSlot, etc.) do NOT catch up, matching TS which doesn't go through | ||
| // `this.currentSlot` for those. | ||
|
|
||
| pub fn currentSlot(self: *EventClock) std.Io.Cancelable!?Slot { | ||
| try self.catchUp(); | ||
| return self.clock.currentSlot(); | ||
| } | ||
|
|
||
| pub fn currentEpoch(self: *EventClock) std.Io.Cancelable!?Epoch { | ||
| try self.catchUp(); | ||
| return self.clock.currentEpoch(); | ||
| } | ||
|
|
||
| pub fn currentSlotOrGenesis(self: *EventClock) std.Io.Cancelable!Slot { | ||
| try self.catchUp(); | ||
| return self.clock.currentSlotOrGenesis(); | ||
| } | ||
|
|
||
| pub fn currentEpochOrGenesis(self: *EventClock) std.Io.Cancelable!Epoch { | ||
| try self.catchUp(); | ||
| return self.clock.currentEpochOrGenesis(); | ||
| } | ||
|
|
||
| pub fn currentSlotWithGossipDisparity(self: *EventClock) std.Io.Cancelable!Slot { | ||
| try self.catchUp(); | ||
| return self.clock.currentSlotWithGossipDisparity(); | ||
| } | ||
|
|
||
| pub fn isCurrentSlotGivenGossipDisparity(self: *EventClock, slot: Slot) std.Io.Cancelable!bool { | ||
| try self.catchUp(); | ||
| return self.clock.isCurrentSlotGivenGossipDisparity(slot); | ||
| } | ||
|
|
||
| pub fn slotWithFutureTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { | ||
| return self.clock.slotWithFutureTolerance(tolerance_ms); | ||
| } | ||
|
|
||
| pub fn slotWithPastTolerance(self: *EventClock, tolerance_ms: u64) ?Slot { | ||
| return self.clock.slotWithPastTolerance(tolerance_ms); | ||
| } | ||
|
|
||
| pub fn secFromSlot(self: *EventClock, slot: Slot, to_sec: ?slot_math.UnixSec) ?i64 { | ||
| return self.clock.secFromSlot(slot, to_sec); | ||
| } | ||
|
|
||
| pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 { | ||
| return self.clock.msFromSlot(slot, to_ms); | ||
| } | ||
|
|
||
| // ── waitForSlot ── | ||
|
|
||
| /// Return type from `waitForSlot`. The caller MUST either: | ||
| /// - call `await()` to wait for the target slot and release resources, OR | ||
| /// - call `cancel()` to abort and release resources, OR | ||
| /// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`. | ||
| /// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks | ||
| /// the internal WaitState. | ||
| /// | ||
| /// Idiomatic usage with `errdefer`: | ||
| /// var fut = try ec.waitForSlot(target); | ||
| /// errdefer fut.cancel(); | ||
| /// try fut.await(); | ||
| pub const WaitForSlotResult = union(enum) { | ||
| immediate: Error!void, | ||
| pending: Pending, | ||
|
|
||
| pub const Pending = struct { | ||
| inner: std.Io.Future(Error!void), | ||
| state: *WaitState, | ||
| clock: *EventClock, | ||
| }; | ||
|
|
||
| pub fn await(self: *WaitForSlotResult) Error!void { | ||
| switch (self.*) { | ||
| .immediate => |r| return r, | ||
| .pending => |*p| { | ||
| // Use the io that created the future to avoid io-mismatch bugs. | ||
| const result = p.inner.await(p.state.io); | ||
| // Free AFTER await returns — workaround for Zig futex | ||
| // use-after-free where GCD still holds a reference to the | ||
| // event address after wake. | ||
| p.state.allocator.destroy(p.state); | ||
| self.* = .{ .immediate = result }; | ||
| return result; | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| /// Abort a pending wait and release its resources. Idempotent — safe | ||
| /// to call on an already-awaited, already-cancelled, or immediate result. | ||
| pub fn cancel(self: *WaitForSlotResult) void { | ||
| switch (self.*) { | ||
| .immediate => return, | ||
| .pending => |*p| { | ||
| // Remove from waiter queue before freeing, so abortAllWaiters | ||
| // won't dereference the freed state pointer. | ||
| p.clock.mutex.lockUncancelable(p.clock.io); | ||
| for (p.clock.waiters.items, 0..) |entry, i| { | ||
| if (entry.state == p.state) { | ||
| _ = p.clock.waiters.popIndex(i); | ||
| break; | ||
| } | ||
| } | ||
| p.clock.mutex.unlock(p.clock.io); | ||
| p.state.aborted = true; | ||
| p.state.event.set(p.state.io); | ||
| // Must await the fiber so it finishes before we free its state. | ||
| // The fiber returns error.Aborted (expected) or {} (already dispatched). | ||
| _ = p.inner.await(p.state.io) catch |err| { | ||
| std.debug.assert(err == error.Aborted); | ||
| }; | ||
| p.state.allocator.destroy(p.state); | ||
| self.* = .{ .immediate = error.Aborted }; | ||
| }, | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| /// Return a future that resolves when the clock reaches `target`. | ||
| /// See `WaitForSlotResult` for the caller's obligations. | ||
| pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult { | ||
| if (self.stopped.load(.acquire)) return .{ .immediate = error.Aborted }; | ||
| // Catch up events then check fast-path against advanced state. | ||
| // catchUp invokes listener callbacks, so we must NOT hold the mutex | ||
| // here — `advanceAndDispatch` takes it internally per state read. | ||
| try self.catchUp(); | ||
|
|
||
| try self.mutex.lock(self.io); | ||
| if (self.clock.current_slot) |slot| { | ||
| if (slot >= target) { | ||
| self.mutex.unlock(self.io); | ||
| return .{ .immediate = {} }; | ||
| } | ||
| } | ||
| if (self.stopped.load(.acquire)) { | ||
| self.mutex.unlock(self.io); | ||
| return .{ .immediate = error.Aborted }; | ||
| } | ||
|
|
||
| const state = self.allocator.create(WaitState) catch { | ||
| self.mutex.unlock(self.io); | ||
| return error.OutOfMemory; | ||
| }; | ||
| state.* = .{ | ||
| .io = self.io, | ||
| .allocator = self.allocator, | ||
| }; | ||
|
|
||
| self.waiters.push(self.allocator, .{ | ||
| .target = target, | ||
| .state = state, | ||
| }) catch { | ||
| self.allocator.destroy(state); | ||
| self.mutex.unlock(self.io); | ||
| return error.OutOfMemory; | ||
| }; | ||
| self.dispatchWaitersLocked(self.clock.current_slot); | ||
| // Release before spawning the async task — async spawn is quick but | ||
| // shouldn't be inside the EventClock mutex. | ||
| self.mutex.unlock(self.io); | ||
|
|
||
| return .{ .pending = .{ | ||
| .inner = std.Io.async(self.io, waitForSlotFutureAwait, .{state}), | ||
| .state = state, | ||
| .clock = self, | ||
| } }; | ||
| } | ||
|
|
||
| // ── Private ── | ||
|
|
||
| /// Ensure event-clock state is caught up to wall-clock time. | ||
| /// Emits any intermediate slot/epoch events to listeners. | ||
| /// No-op if already caught up or pre-genesis (currentSlot() returns null). | ||
| fn catchUp(self: *EventClock) std.Io.Cancelable!void { | ||
| if (self.clock.currentSlot()) |wall_slot| { | ||
| try self.advanceAndDispatch(wall_slot); | ||
| } | ||
| } | ||
|
|
||
| /// Caller must hold `self.mutex`. | ||
| fn snapshotSlotListenersLocked(self: *EventClock) void { | ||
| self.slot_snapshot.clearRetainingCapacity(); | ||
| const limit = @min(self.slot_listeners.items.len, self.slot_snapshot.capacity); | ||
| for (self.slot_listeners.items[0..limit]) |listener| { | ||
| self.slot_snapshot.appendAssumeCapacity(.{ | ||
| .callback = listener.callback, | ||
| .ctx = listener.ctx, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| /// Caller must hold `self.mutex`. | ||
| fn snapshotEpochListenersLocked(self: *EventClock) void { | ||
| self.epoch_snapshot.clearRetainingCapacity(); | ||
| const limit = @min(self.epoch_listeners.items.len, self.epoch_snapshot.capacity); | ||
| for (self.epoch_listeners.items[0..limit]) |listener| { | ||
| self.epoch_snapshot.appendAssumeCapacity(.{ | ||
| .callback = listener.callback, | ||
| .ctx = listener.ctx, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| /// Caller must hold `self.mutex`. | ||
| fn dispatchWaitersLocked(self: *EventClock, current_slot: ?Slot) void { | ||
| const slot = current_slot orelse return; | ||
| while (self.waiters.peek()) |head| { | ||
| if (head.target > slot) break; | ||
| const waiter = self.waiters.pop().?; | ||
| waiter.state.aborted = false; | ||
| // event.set is thread-safe and does not need the mutex. | ||
| waiter.state.event.set(waiter.state.io); | ||
| } | ||
| } | ||
|
|
||
| fn abortAllWaiters(self: *EventClock) void { | ||
| self.mutex.lockUncancelable(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| while (self.waiters.pop()) |waiter| { | ||
| waiter.state.aborted = true; | ||
| waiter.state.event.set(waiter.state.io); | ||
| } | ||
| } | ||
|
|
||
| /// Advance the underlying clock to `target` and dispatch slot/epoch events. | ||
| /// | ||
| /// IMPORTANT: listener callbacks are invoked WHILE holding `self.mutex` to | ||
| /// preserve `iter` consistency across slots. Therefore listener callbacks | ||
| /// MUST NOT call back into `EventClock` (no `onSlot`, `offSlot`, `onEpoch`, | ||
| /// `offEpoch`, `waitForSlot`, `stop`, …) — doing so deadlocks. | ||
| fn advanceAndDispatch(self: *EventClock, target: Slot) std.Io.Cancelable!void { | ||
| try self.mutex.lock(self.io); | ||
| defer self.mutex.unlock(self.io); | ||
| var iter = self.clock.advanceTo(target); | ||
| while (iter.next()) |event| { | ||
| if (self.stopped.load(.acquire)) break; | ||
| switch (event) { | ||
| .slot => |s| { | ||
| self.snapshotSlotListenersLocked(); | ||
| self.dispatchWaitersLocked(s); | ||
| for (self.slot_snapshot.items) |listener| { | ||
| listener.callback(listener.ctx, s); | ||
| } | ||
| }, | ||
| .epoch => |e| { | ||
| self.snapshotEpochListenersLocked(); | ||
| for (self.epoch_snapshot.items) |listener| { | ||
| listener.callback(listener.ctx, e); | ||
| } | ||
| }, | ||
| } | ||
| } | ||
| // Defensive: handles edge cases where advanceTo yields zero events | ||
| // (already at target) but waiters were added between loop ticks. | ||
| // In the normal case, this is a no-op because the last .slot event | ||
| // already dispatched waiters at the same slot value. | ||
| self.dispatchWaitersLocked(self.clock.current_slot); | ||
| } | ||
|
|
||
| fn runAutoLoop(self: *EventClock) void { | ||
| while (!self.stopped.load(.acquire)) { | ||
| const now_ms = self.clock.time.nowMs(); | ||
| // Config validation guarantees sec→ms won't overflow, so null here | ||
| // indicates a logic bug. Break instead of spinning at 1ms. | ||
| const next_ms = slot_math.msUntilNextSlot(self.clock.config, now_ms) orelse { | ||
| std.log.err("EventClock: msUntilNextSlot returned null (config overflow?), stopping loop", .{}); | ||
| self.stop(); | ||
| break; | ||
| }; | ||
| const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64); | ||
|
|
||
| // Sleep failure: cancellation (from join()) exits the loop; | ||
| // other errors re-check the stopped flag. | ||
| std.Io.sleep( | ||
| self.io, | ||
| std.Io.Duration.fromMilliseconds(sleep_ms), | ||
| .awake, | ||
| ) catch |err| { | ||
| if (err == error.Canceled) break; | ||
| std.log.debug("EventClock: sleep failed ({s}), retrying", .{@errorName(err)}); | ||
| continue; | ||
| }; | ||
|
|
||
| if (self.stopped.load(.acquire)) break; | ||
| // Only advance after genesis. Before genesis currentSlot() returns | ||
| // null — skipping here prevents emitting slot 0 prematurely. | ||
| if (self.clock.currentSlot()) |slot| { | ||
| self.advanceAndDispatch(slot) catch |err| switch (err) { | ||
| error.Canceled => break, | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn waitForSlotFutureAwait(state: *WaitState) Error!void { | ||
| // NOTE: Do NOT free state here. The caller (WaitForSlotResult.await) frees | ||
| // it AFTER this future completes — workaround for Zig futex use-after-free | ||
| // where GCD still holds a reference to the event address after wake. | ||
| state.event.waitUncancelable(state.io); | ||
| if (state.aborted) return error.Aborted; | ||
| } |
There was a problem hiding this comment.
The assertion density in this file is significantly below the required minimum of two assertions per function (Line 54). Critical functions like init, start, stop, and advanceAndDispatch should be infused with assertions to detect programmer errors and enforce invariants, such as checking that the clock is not already stopped when starting or that the target slot is valid.
References
- The assertion density of the code must average a minimum of two assertions per function. (link)
| next_listener_id: ListenerId = 1, | ||
| slot_listeners: std.ArrayListUnmanaged(SlotListenerEntry) = .empty, | ||
| epoch_listeners: std.ArrayListUnmanaged(EpochListenerEntry) = .empty, | ||
| slot_snapshot: std.ArrayListUnmanaged(SlotSnapshot) = .empty, | ||
| epoch_snapshot: std.ArrayListUnmanaged(EpochSnapshot) = .empty, | ||
|
|
||
| waiters: WaiterQueue, |
There was a problem hiding this comment.
The repository style guide mandates putting a limit on everything, specifically mentioning that all loops and all queues must have a fixed upper bound to prevent infinite loops or tail latency spikes (Line 38-39). The waiters priority queue and the slot_listeners/epoch_listeners array lists currently have no fixed capacity limits, which could lead to memory exhaustion or latency spikes under load.
References
- Put a limit on everything because, in reality, this is what we expect—everything has a limit. For example, all loops and all queues must have a fixed upper bound. (link)
| .clock = undefined, | ||
| .waiters = WaiterQueue.initContext({}), | ||
| }; | ||
| self.clock = SlotClock.init(config, .{ .real = .{ .io = io_handle } }) catch return error.InvalidConfig; |
There was a problem hiding this comment.
Addresses wemeetagain's review on the previous PR — lodestar TS has deprecated `seconds_per_slot` in favor of `slot_duration_ms` ahead of the h-fork move to 6-second slots. Storing duration in ms also keeps sub-second-granularity slot durations representable. Changes: - `Config.seconds_per_slot: u64` → `Config.slot_duration_ms: u64` - `slotDurationMs()` no longer optional — returns `u64`, eliminates the sec→ms overflow path - `slotAtMs` / `msUntilNextSlot` divide by `slot_duration_ms` directly - `slotAtSec` / `slotStartSec` go through ms internally for consistency - `validate()` drops the `secToMs(seconds_per_slot)` overflow check (no longer applicable) All test fixtures updated; tests still pass on macOS and Linux.
Two P2 findings on the replanted branch: 1. Gossip disparity boundary was strict `<` — a block/sidecar arriving exactly `maximum_gossip_clock_disparity_ms` early was rejected as "next slot, not yet". TS lodestar's gossip future check accepts equality, so we should too. Changed both `<` sites in SlotClock to `<=` (`currentSlotWithGossipDisparity` and `isCurrentSlotGivenGossipDisparity`). Existing test that asserted the old strict behavior updated. 2. Listener-vs-waiter dispatch order. Under `std.Io.Threaded`, the waiter task wakes on a different OS thread, so `event.set` racing with listener callbacks meant `waitForSlot(...).await()` could return before listeners had finished mutating their state — a test reading `trace.slot_len` right after await could observe the old value. Invoke listener callbacks BEFORE `dispatchWaitersLocked` so await sees the post-callback world. Both pass on macOS arm64 + Linux x86_64.
Addresses the second half of wemeetagain's review on PR #301: TS lodestar is preparing for EIP-7782 (consensus-specs#4484, anticipated in h-fork) which switches from 12-second to 6-second slots mid-chain. A single `slot_duration_ms` can't express that. Replaces `Config.slot_duration_ms: u64` with `Config.slot_durations: []const SlotDuration`, where each `SlotDuration` records the first slot at which a duration applies. The first entry must have `start_slot = 0`. Forks that change duration append an entry whose `start_slot = fork_epoch * slots_per_epoch`. `Config.constantDuration(genesis, ms, slots_per_epoch)` is the single-segment convenience for chains without any duration transition (borrows a static-lifetime schedule slice). `Config.slotDurationMsAt(s)` walks the schedule backwards. `slotAtMs` / `slotStartMs` / `msUntilNextSlot` walk the schedule cumulatively, summing per-segment ms until they find the segment that contains the timestamp / slot. Validation enforces non-empty schedule, first entry at slot 0, ascending `start_slot`, and non-zero durations. New tests cover an EIP-7782-shape config (12s slots up to slot 1024, 6s thereafter) and exercise `slotDurationMsAt`, `slotStartMs`, `slotAtMs`, and `msUntilNextSlot` across the boundary. `SlotClock` and `EventClock` test fixtures migrated to the schedule form. Module API itself is unchanged — `slot_durations` is the only config-shape change.
Aligns 1:1 with lodestar TS ChainConfig fields after EIP-7782 scaffolding (lodekeeper/lodestar@44a4048c): SLOT_DURATION_MS → slot_duration_ms SLOT_DURATION_MS_EIP7782 → slot_duration_ms_after_fork EIP7782_FORK_EPOCH * SLOTS_PER_EPOCH → fork_slot Both `fork_slot` and `slot_duration_ms_after_fork` default to null; `validate()` requires both set or both null. Compared to the previous schedule-slice form (`b2806e48`): - TS field-mapped 1:1 (caller doesn't need to assemble a slice) - No slice ownership / static-lifetime concern - Math is simpler — one if branch instead of segment walk - Trade-off: hard-coded for ONE fork transition. EIP-7782 is the only slot-duration change on the roadmap; if a third transition ever comes we'll refactor then. TS hasn't even shipped the math for the EIP-7782 transition yet. Tests retained: EIP-7782-shape config covers `slotDurationMsAt`, `slotStartMs`, `slotAtMs`, `msUntilNextSlot` across the boundary.
…lice Switch from the 2-field "fork before/after" form to a primary `slot_duration_ms` plus a default-empty `duration_transitions` slice. Best of both worlds: - `slot_duration_ms` maps 1:1 to TS `ChainConfig.SLOT_DURATION_MS` — the primary value for chains with no slot-duration change (Ethereum mainnet today). Default-empty `duration_transitions` means simple chains have a clean Config without optional fields. - One transition (EIP-7782): one entry in the slice; the entry corresponds to TS `(EIP7782_FORK_EPOCH * SLOTS_PER_EPOCH, SLOT_DURATION_MS_EIP7782)`. - N transitions in a hypothetical future: just append entries — no API break, no `fork_slot_2` ladder, no breaking refactor. Math walks segments cumulatively the same way the schedule form did, but the first segment's duration comes from `slot_duration_ms` instead of being the first array entry. Validation enforces sorted ascending `from_slot`, non-zero durations, and that `from_slot != 0` (would conflict with `slot_duration_ms`). Adds a `two_fork` test config to exercise the N-transition path across both boundaries, alongside the existing EIP-7782-shape config covering one transition. `SlotClock` / `EventClock` test fixtures unchanged — they all use the default empty `duration_transitions`.
Slot duration is a fundamental chain parameter that's expected to
change rarely (Ethereum has had zero changes since beacon chain
genesis; EIP-7782 anticipates one). Replaces the
`[]const DurationTransition` slice with an inline
`[max_duration_transitions]DurationTransition` array (cap = 4).
Eliminates the slice ownership / lifetime concern: Config is now
fully value-typed, copyable, and embeddable in any owning struct
without lifetime gymnastics.
`from_slot == 0` doubles as the "unused entry" sentinel — already
rejected by `validate()` for active entries. `Config.transitions()`
returns the active prefix as a slice for callers that want to
iterate. `forkTransitions(comptime list)` is a comptime-friendly
builder that pads trailing slots with sentinels.
Construction reads the same:
```zig
.duration_transitions = forkTransitions(&.{
.{ .from_slot = 1024, .new_duration_ms = 6_000 },
}),
```
Validation extended to reject "active entry after sentinel" (gap
in the inline array) — sentinels must be a contiguous trailing run.
Codex review caught that `std.Io.async` is allowed to fall back to inline execution in a few cases — Threaded backend's `busy_count >= async_limit`, single_threaded builds, OOM, or `Thread.spawn` failure. If `waitForSlotFutureAwait` runs inline, it blocks on `state.event.waitUncancelable(io)`, hanging the caller's thread before `waitForSlot` even returns the `WaitForSlotResult`. `std.Io.concurrent` has the stronger guarantee: it always opens a new worker (fiber on Evented, OS thread on Threaded), or returns `error.ConcurrencyUnavailable`. Switch to it. On Evented backends (Dispatch, Uring, Kqueue) the runtime behaviour is unchanged — both `async` and `concurrent` create a fresh fiber and only diverge on OOM. On Threaded the behaviour matches what the test fixture already expects: every waiter gets its own thread up to `concurrent_limit` (default `.unlimited`). Adds `ConcurrencyUnavailable` to `Error`. On failure, removes the freshly-pushed waiter from the queue under `lockUncancelable` (cleanup must complete) and frees `state` before returning the error.
1. `FakeTime.advanceSlot` was still calling the old `Config.slotDurationMs()` (no-arg, returns `?u64`), which the `slot_duration_ms` + `duration_transitions` refactor removed. It's currently dead code but would fail to compile the moment a caller showed up. Switch to the fork-aware `slotDurationMsAt(slot)` keyed off the slot containing the current `FakeTime.ms`. Adds a test that exercises advance across a fork boundary. 2. `msUntilNextSlot` did `slot + 1` unconditionally. For tight configs (e.g. 1ms slots from epoch 0) at `now_ms = maxInt(u64)`, `slotAtMs` returns `maxInt(Slot)` and the unchecked add overflowed — Debug panics, ReleaseFast wraps. The function's contract is "Returns null only on arithmetic overflow", so use checked `std.math.add(u64, slot, 1)` and propagate `null`. Adds a test at the boundary.
`zig build test:clock` walks tests reachable from `src/clock/root.zig`
via `test {}` block references. The block already pulled `slot_math`,
`SlotClock`, `EventClock` but missed `time_source`, so the
`FakeTime.advanceSlot` test added in 90b9bea never actually ran.
Adds `_ = time_source;` so the suite covers it. Test count goes from
37 to 38; `advanceSlot uses fork-aware duration` now runs as case 12/38.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 08ea0840e6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (self.slot_snapshot.items) |listener| { | ||
| listener.callback(listener.ctx, s); |
There was a problem hiding this comment.
Avoid calling listeners while holding EventClock mutex
advanceAndDispatch invokes listener callbacks while self.mutex is still locked, and those callbacks can legally call public APIs like offSlot, offEpoch, waitForSlot, or stop that try to lock the same mutex again. Because this mutex is non-reentrant, that pattern deadlocks the clock loop and can permanently stall event delivery in production whenever a listener performs common actions such as self-unsubscription or scheduling another wait.
Useful? React with 👍 / 👎.
| // `Threaded` busy_count >= async_limit), which would block the caller's | ||
| // thread inside `event.waitUncancelable` and never return the | ||
| // `WaitForSlotResult`. | ||
| const future = std.Io.concurrent(self.io, waitForSlotFutureAwait, .{state}) catch { |
There was a problem hiding this comment.
Provide fallback when concurrent task creation is unavailable
waitForSlot hard-depends on std.Io.concurrent, so on Io implementations/build modes without task-level concurrency this path returns error.ConcurrencyUnavailable and the core waiting API becomes unusable. Since waitForSlot is a primary feature of this module, lacking a non-concurrent fallback means callers can fail at runtime purely due backend choice rather than slot state.
Useful? React with 👍 / 👎.
Codex P1 finding: `advanceAndDispatch` previously held `self.mutex` while
calling listener callbacks, deadlocking any callback that re-entered
public APIs (`offSlot`, `waitForSlot`, …) — which is a common pattern
(self-unsubscription, scheduling new waits).
Restructure so per event the dispatcher:
1. Snapshots listener list under mutex (snapshot helper now grows the
buffer itself, so onSlot/onEpoch no longer touch `slot_snapshot`/
`epoch_snapshot` and cannot realloc them concurrently with iteration)
2. Wakes matching waiters under mutex
3. Releases mutex
4. Invokes callbacks (re-entry into public APIs is now safe)
5. Re-acquires mutex for the next iter step
`iter` is kept alive across the unlock/relock boundary to preserve its
internal `pending_epoch` cursor.
Add `dispatching: atomic.Value(bool)` to serialize `advanceAndDispatch`
across the unlock/relock boundary: the shared snapshot buffer has a
single owner, and callback-driven re-entry (callback → `currentSlot` →
`catchUp` → here) returns immediately instead of recursing.
Snapshot grow at dispatch time can now OOM, so `advanceAndDispatch` /
`catchUp` / read APIs (`currentSlot`, `currentEpoch`, …) widen from
`std.Io.Cancelable!T` to `Error!T`. `runAutoLoop` logs and continues on
OOM, breaks on Canceled, logs+stops on the impossible-but-exhaustive
remainder.
Previous version called `iter.next()` (which advances `clock.current_slot` unrecoverably) BEFORE the snapshot capacity grow. An OOM there would silently drop the in-flight slot/epoch event — next dispatch's iter starts past it. Move the `ensureTotalCapacity` calls into a single `reserveSnapshotCapacityLocked` invoked at the top of each loop, before `iter.next()`. OOM there leaves iter un-advanced so the next dispatch covers the same slot. Also flip `runAutoLoop`'s OOM handling from log-and-continue to log-and-stop. A beacon node that can't grow a small listener snapshot is already failing; quietly retrying every slot tick just floods logs and hides the real problem. Stop the loop and let the supervising layer react (matches CLAUDE.md fail-fast policy).
…elable
The previous commit defended against snapshot-OOM mid-iter by reserving
capacity before each `iter.next()`. But `runAutoLoop` already fail-fasts
on OOM (stops the whole clock), so "preserving the in-flight slot" was
empty insurance — once OOM occurs the clock is dead and no slot will
fire either way.
Replace the explicit OOM error path with a panic. A beacon node that
can't allocate ~16 bytes per listener has process-level problems that
the supervising layer should resolve via restart, not something this
function should plumb through every read API.
Removes:
- `reserveSnapshotCapacityLocked` helper
- `…AssumeCapacity` naming on snapshot helpers
- `Error!T` widening on `currentSlot` / `currentEpoch` /
`currentSlotOrGenesis` / `currentEpochOrGenesis` /
`currentSlotWithGossipDisparity` / `isCurrentSlotGivenGossipDisparity`
- OOM `else` branch in `runAutoLoop`
Net −19 lines; read APIs are back to `Cancelable!T` matching the
pre-replant signatures.
The previous commit panicked on snapshot grow OOM. Switch to returning `error.OutOfMemory` instead — matches Zig stdlib idiom (allocator failure is a value, not a panic), is testable via `FailingAllocator`, and lets `runAutoLoop` log the failure cleanly before stopping the clock. Net effect for callers is the same (clock dies, supervisor restarts), but the failure mode is observable. No `reserveSnapshotCapacityLocked` is reintroduced. OOM happens AFTER `iter.next()` advanced the clock past the in-flight slot, so the slot's listeners do not fire — but `runAutoLoop` stops the clock immediately after, and once stopped no further slots fire either. A single missed slot during shutdown does not justify the pre-reservation complexity. Read APIs (`currentSlot`, `currentEpoch`, `currentSlotOrGenesis`, `currentEpochOrGenesis`, `currentSlotWithGossipDisparity`, `isCurrentSlotGivenGossipDisparity`) widen from `Cancelable!T` to `Error!T`. `runAutoLoop` gains an `else` branch that logs and stops on any non-`Canceled` failure.
…rget preservation, start spawn) Four issues raised in the latest codex review of the clock-replant branch: P1 — Wake waiters AFTER listener callbacks (correctness, Threaded race). Previously `dispatchWaitersLocked(s)` ran inside the mutex BEFORE the unlock+callback step. Under `std.Io.Threaded`, the event.set() resumed a `waitForSlot(...).await()` on another thread, which could observe state listeners hadn't yet written. Move `dispatchWaitersLocked` to AFTER the relock that follows the callback loop, so the relock's release-acquire ordering covers listener-written state. P2 — Dispatch before sleeping in `runAutoLoop`. Previous order was compute-sleep-dispatch. If a dispatch ran past a slot boundary, the next loop iteration computed a tiny msUntilNextSlot to the boundary AFTER the elapsed slot, sleeping through the slot's events. Reorder to dispatch-compute-sleep so any time elapsed during the prior callback batch fires immediately. P3 — Pick up wall-clock drift at end of dispatch loop. When concurrent caller B short-circuits via the `dispatching` CAS, B's desired (higher) target was lost. Wrap the iter loop in an outer loop that re-reads `clock.currentSlot()` after the inner exits; if wall- clock advanced past `current_target` (because callbacks were slow OR because a concurrent caller wanted a higher target), bump the target and re-iter. iter.next() returns null only after draining its pending_epoch cursor, so recreating iter at the outer-loop boundary cannot drop a queued epoch event. P4 — `start()` uses `concurrent`, returns Error!void. `std.Io.async` is allowed to run inline on backends that can't spawn more work (single-threaded, busy_count >= async_limit, OOM); that would block `start()` indefinitely inside `runAutoLoop`. Switch to `std.Io.concurrent` (mirrors the prior P1 fix on `waitForSlot`) and surface `error.ConcurrencyUnavailable`. All 6 in-tree tests that call `clock.start()` now `try` it.
…re sleep) P2-A — `WaitForSlotResult.cancel()` raced with the dispatcher / `abortAllWaiters()`: it wrote `state.aborted = true` outside the mutex even when the waiter had already been popped under that mutex by another path. On `std.Io.Threaded` this was a data race against the fiber's read of `state.aborted` after wake, AND could turn a successfully reached slot into `error.Aborted`. Fix: only write the flag while we still own the popped entry inside the critical section. If we don't find ourselves in `waiters`, the dispatcher (or `abortAllWaiters`) already wrote the flag and signaled the event, so just await and free. P2-B — `runAutoLoop` slept on `.awake` which excludes suspended host time. Combined with `now_ms`/slot_math running on wall-clock, a host suspend left the loop sleeping its remaining pre-suspend duration after resume — slot dispatch lagged real chain time by however long the suspend lasted. Switch to `.boot`, which is monotonic but counts suspend, so the sleep wakes promptly after resume and we catch up via the existing wall-clock recheck loop in `advanceAndDispatch`.
…eep boundary) P2 — `waitForSlot` fast-path raced with in-flight slot callbacks. When a dispatch was running outside the mutex (snapshot taken, callbacks executing), `iter.next()` had already advanced `clock.current_slot` to (or past) `target`. A concurrent `waitForSlot(target)` would acquire the mutex during the callback window, hit `slot >= target`, and return `.immediate` — but the listeners for `target` hadn't finished writing the state the awaiter then read. Gate the fast-path on `dispatching == false`. When dispatch is in-flight, queue the waiter; the dispatcher's per-slot `dispatchWaitersLocked` (which now runs after the callback batch under the relock) wakes it in the right order. P2 — `runAutoLoop` could sleep through a just-started slot. Previously the sleep was computed from `msUntilNextSlot(now_ms)`, which uses wall-clock now to pick the upcoming boundary. If the wall crossed a boundary between `advanceAndDispatch()` returning and this calculation, the helper returned the time to the boundary AFTER the just-started slot — sleeping a full slot and delaying that slot's listener/waiter dispatch. Compute the sleep from the cached next undispatched slot's start time (`slotStartMs(config, current_slot + 1)`) and skip the sleep if that timestamp is already in the past.
…-fiber model Pivot the threading model: target `std.Io.Evented` (single OS thread, cooperative fibers) instead of trying to be safe under `std.Io.Threaded`. Public methods are called from one execution flow; the internal `runAutoLoop` is its own fiber on the same OS thread, so cooperative scheduling provides mutual exclusion at non-yield points without any explicit lock. Removes the entire complexity stack that grew defending the multi- threaded mutex model: - `mutex: std.Io.Mutex` field + every lock/unlock/lockUncancelable call - `dispatching: atomic.Value(bool)` + the CAS protocol that serialized concurrent dispatchers - snapshot-listeners-under-lock + invoke-callbacks-after-unlock dance - The outer wall-clock recheck loop in `advanceAndDispatch` - `Error!T` widening on read APIs (`currentSlot`, `currentEpoch`, ...) that came from `mutex.lock`'s `Cancelable` plus `OutOfMemory` from snapshot grow - `error.ConcurrencyUnavailable` from `std.Io.concurrent` on `start()` (back to `std.Io.async` returning void) - `stopped: atomic.Value(bool)` (back to plain bool) Restores TS-equivalent semantics: - Listener callbacks run synchronously on the dispatching fiber under `emitSlot` / `emitEpoch`, like `EventEmitter.emit` in TS - `currentSlot()` etc. are non-fallible; they `catchUp` synchronously before returning, matching `get currentSlot()` triggering emit - `waitForSlot` keeps the future API; `std.Io.async` is sufficient because under Evented the future's awaited fiber runs cooperatively with `runAutoLoop`'s fiber that signals it Documented contract: callbacks must not call back into `EventClock` methods (mirrors TS where re-entering the `currentSlot` getter would recurse `emitEvents`). Tests still use `std.Io.Threaded` for now — will revisit when stdlib `Io.Dispatch` (macOS) and `Io.Uring` (Linux) bugs blocking Evented in 0.16 are addressed. Net −172 lines (1067 → 895 in EventClock.zig).
Codex flagged that under our single-fiber contract, three issues remain that don't depend on the threading model: - Tests still instantiated `std.Io.Threaded`, contradicting the documented Evented-only model and racing on shared state. - `runAutoLoop` slept first then dispatched, so a slow callback could push the next slot's events to the boundary AFTER the elapsed slot. - `.awake` clock excludes suspended host time even though slot math runs on wall-clock, so a suspend would leave the loop behind real chain time. Fixes: - `TestIo` switches from `std.Io.Threaded` to `std.Io.Evented` — Linux uses Uring, macOS uses Dispatch. NOTE: Zig 0.16.0 stdlib has several bugs in both backends (`Io/Uring.zig` missing `ReadOnlyFileSystem` in error sets, `Io/Dispatch.zig` comptime assertion in `deinit`); upstream patches needed before this builds cleanly out of the box. - `runAutoLoop` reordered to dispatch-then-sleep, with sleep computed against `current_slot + 1`'s boundary rather than wall-clock now (so a wall-jump between dispatch and sleep doesn't skip a slot). - Sleep clock changed from `.awake` to `.boot` so suspended host time counts. Codex's fourth finding (P1: `std.Io.async` may run inline) does not apply under the Evented-only contract — `async` returns once the function yields, which `runAutoLoop` and `waitForSlotFutureAwait` both do at their first I/O wait. The `concurrent`/`ConcurrencyUnavailable` machinery is not needed.
Match the original PR #301 shape that the user asked to revert to: - `WaitForSlotResult` reverts from tagged union (`.immediate` / `.pending` switch) to a plain struct that uses `Future{.any_future = null, .result = ...}` for the immediate case. `Future.await` returns `.result` when `any_future` is null, so the callsite is uniform whether the wait was immediate or pending — no per-call discriminant. - `await(io)` now takes the `io` parameter explicitly. Removes the `state.io` indirection and the awkwardness of needing `state` to exist for an immediate result. - `cancel()` collapses to an `orelse return` early-out followed by the cleanup path; no more two-arm switch. - `advanceAndDispatch` reverts to module-private `fn` (still callable from in-file tests). Was made `pub` only to keep the outside-of-test API surface stable through earlier rewrites. - `runAutoLoop` reverts to the simpler `sleep → dispatch` order with `msUntilNextSlot(now_ms)` and `.awake` clock. The codex P2 findings (catch-up before sleep, suspend-aware sleep) were strict improvements but added complexity that the rest of the lodestar TS reference does not have either; matching TS behavior is preferred here. - Doc comment trimmed back to the original short version. Kept the `slot_duration_ms` Config and fork-aware schedule in slot_math — unrelated to the EventClock surgery. Tests adjust to call `fut.await(io_handle)` and use `slot_duration_ms = N_000` instead of `seconds_per_slot = N`.
Comments now describe what the code does instead of pointing back to the TS lodestar implementation. Affected: - EventClock.zig listener API note (callback re-entry rules) - EventClock.zig delegated read APIs section header - SlotClock.zig gossip-disparity-threshold test - slot_math.zig Config doc
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 400364d267
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
The clock module's production model targets `std.Io.Evented`, but Zig 0.16.0's stdlib has compilation bugs in both Evented backends: `Io/Uring.zig` (Linux) is missing `error.ReadOnlyFileSystem` from several error sets, and `Io/Dispatch.zig` (macOS) hits a comptime assertion in `deinit`. Until those land upstream, CI cannot build tests against Evented without ad-hoc stdlib patches. Switch `TestIo` from `std.Io.Evented` to `std.Io.Threaded` so the unit tests build and run cleanly on stock Zig 0.16.0. The clock logic itself does not depend on the backend choice for correctness (no shared mutation between fibers / threads beyond what `std.Io.Event` synchronizes), so Threaded is a valid harness. Add a `Run clock tests` step to `.github/workflows/CI.yml` so the module is exercised on every push instead of relying on local / codespace runs.
Drop the per-test `TestIo` struct that wrapped `std.Io.Threaded` — `std.testing.io` is the test runner's preset `Io.Threaded` instance, ready to use without per-test init/deinit. 13 tests × 4 lines of boilerplate disappear.
macOS Threaded backend hangs in `Future.cancel + await` in Zig 0.16.0. On Linux the same suite finishes in ~7s; macOS ran past 8 minutes without progress before manual cancel of the workflow. Exclude macOS until the stdlib issue is identified or fixed upstream.
`std.Io.async` is allowed to fall back to running the function inline on the caller when the backend's async pool is exhausted. On `std.Io.Threaded` the default `async_limit` is `CpuCount - 1`, so on a 3-core CI runner the limit is 2. Concretely, the `multiple waiters are dispatched` test spawns three `waitForSlotFutureAwait` fibers; with `async_limit = 2` the third one runs inline on the test thread, blocks inside `event.waitUncancelable`, and the test never reaches `clock.advanceAndDispatch(3)` to fire the events. Result: deadlock. Locally on a 10-core mac (`async_limit = 9`) the same test passes — which is why the hang only showed up on GitHub's macos-latest runner. `std.Io.concurrent` does not have the inline fallback: it either spawns a fresh worker (within `concurrent_limit`, which defaults to `unlimited`) or returns `error.ConcurrencyUnavailable`. Use it for both internal spawns and surface the new error variant. `start()` becomes `Error!void` so callers `try` it. Also reverts the per-test macOS diagnostic CI step that was added to find this hang — `test:clock` now runs as a normal step on every matrix OS.
|
@codex review again |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 100fe46cee
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
`std.Io.concurrent` failure path explicitly destroyed `state`, but the `errdefer self.allocator.destroy(state)` declared right after the allocation also fires on an error return — so the same allocation was freed twice. Let `errdefer` handle the cleanup.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c1a6d65bf0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| pub fn start(self: *EventClock) Error!void { | ||
| if (self.loop_future != null) return; | ||
| self.loop_future = std.Io.concurrent(self.io, EventClock.runAutoLoop, .{self}) catch | ||
| return error.ConcurrencyUnavailable; |
There was a problem hiding this comment.
Dispatch current slot before starting auto-loop sleep
Starting the clock does not emit the current slot immediately: start() only spawns runAutoLoop, and the loop sleeps until the next boundary before its first advanceAndDispatch. If the process starts in the middle of slot N (a normal restart case), listeners never receive slot N and only see N+1, which can delay or skip slot-scoped work until the next slot. Emit/catch up once during start() (or at the top of runAutoLoop) before the initial sleep.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
this logic is same with lodestar ts
* Add zio v0.11.0 dep; clock module imports it. Tests now spin up a per-test zio.Runtime via a `runInRuntime` helper, replacing the prior std.testing.io (threaded) backend — kqueue-backed I/O on macOS sidesteps the concurrent-spawn flakiness chased in recent CI work. * Tighten `advanceAndDispatch`: move the `stopped` check before `iter.next()` so a callback that calls `stop()` can't leave current_slot one ahead of the last-emitted slot. * Add property-based test (500 seeds x 50 ops, model-based) covering invariants derived from lodestar TS `clock.ts`: slot/epoch exact-once per active listener, waiter resolution at advance, abort on stop, off() stops delivery. * Add focused gap tests: callback-offSlot-self, callback-stop-clock, ListenerLimitReached at maxInt, OOM rollback via FailingAllocator (snapshot/append failure paths), N waiters at same target slot. * Document the cooperative single-fiber std.Io contract at file head. 44/44 tests pass on macOS in ~10s.
Removes six `// ── X ──` section markers (rot-prone, redundant with function/test names) and one self-evident `/// Initialise in-place.`. Rewrites the comments on `WaitForSlotResult.await` and `waitForSlotFutureAwait` that attributed the "free after fiber returns" ordering to a Zig std futex / macOS GCD use-after-free workaround. The clock no longer runs on the std backend, so the GCD attribution was outdated. The structural reason (fiber reads `state.aborted` after the wake, so the caller frees only once the fiber has fully returned) still holds and is now what's documented. No code logic changes; 44/44 tests pass.
…ases * `slot_math.Slot` / `slot_math.Epoch` now alias `consensus_types.primitive.Slot.Type` / `Epoch.Type` instead of redeclaring `u64`, matching the convention used by `state_transition`, `fork_choice`, `config`, and `fork_types`. * Drop `slot_math.UnixMs` / `slot_math.UnixSec` aliases — they were inline u64 renames with no semantic value; signatures and test casts now use `u64` directly. * Add `consensus_types` to the `clock` module's imports. Also a small follow-up to the prior cleanup: * Drop section dividers in slot_math.zig (`// ── Type aliases ──`, `// ── Config ──`). * Trim several verbose doc/inline comments down to one-liners while keeping all WHY content (math justifications, overflow handling, sentinel invariants). * Drop a handful of what-comments in `SlotClock.AdvanceIterator.next` and `isCurrentSlotGivenGossipDisparity` whose information was already on the line below. No logic changes; 44/44 tests pass.
Use Zig 0.16's for-expression with else clause so `n` becomes a const bound to the loop result, instead of an empty-body while with increment in the continuation expression.
Transitions are validated as ascending by `from_slot`, so a forward
walk that breaks once `from_slot > slot` finds the same answer as the
reverse scan, without the `var i: usize = N; while (i > 0) { i -= 1; ... }`
underflow-avoiding pattern.
…epochAtSlot * `slotAtMs`: drop three `if (seg_duration == 0) return null` checks that `validate()` already prevents. Add an entry-point `std.debug.assert` for `slot_duration_ms != 0` and document the precondition. * `slotAtSec`: unwrap nested `orelse return null` into a two-step form. * `epochAtSlot`: drop the `slots_per_epoch == 0` check (validate prevents it) and tighten the return type from `?Epoch` to `Epoch`. Replaces the runtime guard with `std.debug.assert`. * `SlotClock.AdvanceIterator.next`: with `epochAtSlot` no longer optional, the nested `if (prev_epoch) |...| if (new_epoch) |...|` pyramid collapses to a flat `if (prev_epoch < new_epoch)`. 44/44 tests pass; Codex review clean.
`currentSlotWithGossipDisparity` and `isCurrentSlotGivenGossipDisparity` previously called `time.nowMs()` twice — once indirectly via `currentSlotOrGenesis()` and once directly — creating a wall-clock race where the two reads could observe different times and disagree about which slot now lives in. Read `nowMs` once and derive both the current slot and the boundary distance from that single snapshot. This matches the spec's single `current_time` variable (`current_time + MAXIMUM_GOSSIP_CLOCK_DISPARITY < message_time`) and lets `now_ms < next_slot_ms` be a real invariant — so the saturating subs `-|` collapse back to plain `-`. Add a doc comment on `currentSlotWithGossipDisparity` citing the spec rationale for `<=` over `<`.
Apply TigerBeetle "resource grouping" style — `defer`/`errdefer` stays
glued to the acquisition above it, and a blank line separates each
resource pair from the next block. Four sites:
* `runInRuntime`: between `defer rt.deinit()` and `var handle = ...`.
* Property test `.on_slot` / `.on_epoch`: between the `errdefer { ... }`
tracker-cleanup block and the subsequent capacity reservations.
* `multiple waiters` test: between the three sequential
`var futN = try clock.waitForSlot(...); errdefer futN.cancel();` pairs.
`errdefer fut.cancel(); try fut.await(io);` is left adjacent — that's
the "guard the consumption" idiom, not two separate resource pairs.
Adds `src/bounded_array.zig`, a stack-resident fixed-capacity array with
infallible runtime ops. Registered as a top-level module so other
modules can use it.
Uses BoundedArray in two places that previously relied on heap-backed
collections plus implicit invariants:
* `slot_math.Config.duration_transitions`: replaces
`[max_duration_transitions]DurationTransition` + `from_slot == 0`
sentinel padding with `BoundedArray(DurationTransition, 4)`. Removes
the sentinel concept entirely — `count` tracks active length.
Simplifies `validate()` (no more `seen_sentinel` state machine),
`transitions()` (returns `constSlice()` directly), and
`forkTransitions()` (push loop instead of splat-and-overwrite).
* `EventClock.{slot,epoch}_{listeners,snapshot}`: replaces 4
`ArrayListUnmanaged` fields with `BoundedArray`. Drops the init-time
`ensureTotalCapacityPrecise` pre-allocation dance and the corresponding
errdefer chain. Drops the four `.deinit(allocator)` calls in
`EventClock.deinit`. `onSlot/onEpoch` use the type's `full()` predicate
for limit checks and infallible `push()` for appends.
Hard caps for clock buffers, chosen to be tight enough to catch
accidental over-registration:
max_slot_listeners = 16
max_epoch_listeners = 16
max_waiters = 1024
Separates the waiter limit from listener limits with a new
`error.WaiterLimitReached` (waitForSlot previously reused
`ListenerLimitReached`, which was misleading).
Also folds in `runAutoLoop` post-condition `assert(self.stopped)`,
making the non-terminating event loop's exit invariant explicit per
TigerStyle "where a loop cannot terminate, this must be asserted".
44/44 clock tests pass; 9/9 bounded_array tests pass (including a
comptime-construction test that locks the type's comptime-friendliness
for use in `forkTransitions`).
Move instance fields above type/constant declarations in `EventClock`
and `SlotClock` so the layout reads:
const Foo = @this();
<fields>
<pub consts / nested types>
<methods>
This matches Zig stdlib (`std.ArrayList`), TigerStyle's "fields first,
then types, then methods" rule, and the rest of this project (e.g.
`state_transition/cache/epoch_cache.zig`). Zig resolves identifiers
across struct members at comptime, so fields can reference types
declared lexically later in the same struct.
No behaviour change; 44/44 clock tests still pass.
|
@codex review |
|
Codex Review: Didn't find any major issues. Delightful! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
Motivation
Lodestar (TS) needs a beacon clock that emits slot/epoch events and lets callers wait on specific slots. This adds the equivalent in Zig as a self-contained module under
src/clock/, ported from ChainSafe/clock-zig and adapted to Zig 0.16'sstd.Ioasync model.Description
Three-layer architecture:
slot_math) — pure arithmetic, comptime-compatible.SlotClock) — stateful clock with pluggableTimeSource.EventClock) — async event loop with listeners and waiters; built onstd.Io.