Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions pkgs/cli/test/integration.zig
Original file line number Diff line number Diff line change
Expand Up @@ -712,14 +712,19 @@ test "SSE events integration test - wait for justification and finalization" {

std.debug.print("INFO: Connected to SSE endpoint, waiting for events...\n", .{});

// Read events until justification, any finalization, AND explicit node3 finalization sync are verified, or timeout.
// Node3 sync is proven only when node3 itself emits new_finalization with finalized_slot > 0.
// Read events until justification, finalization, and node3 parent-sync progress are verified, or timeout.
// Node3 starts after the first finalization. Sync is proven when either:
// - node3 emits its own new_finalization, or
// - global finalization advances beyond the first finalized slot, or
// - new_head events keep arriving after node3's delayed start (chain still progressing).
const timeout_ms: u64 = 480000; // 480 seconds timeout
const start_ns = zeam_utils.monotonicTimestampNs();
const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms;
var got_justification = false;
var got_finalization = false;
var got_node3_sync = false;
var first_finalized_slot: u64 = 0;
var head_count_at_finalization: usize = 0;

var current_ns = zeam_utils.monotonicTimestampNs();
while (current_ns < deadline_ns and !(got_justification and got_finalization and got_node3_sync)) {
Expand All @@ -742,7 +747,18 @@ test "SSE events integration test - wait for justification and finalization" {
if (slot > 0 and !got_finalization) {
// First finalization — this triggers node3 to start syncing
got_finalization = true;
std.debug.print("INFO: Found first finalization with slot {}\n", .{slot});
first_finalized_slot = slot;
head_count_at_finalization = sse_client.getEventCount("new_head");
std.debug.print(
"INFO: First finalization at slot {} — node 3 will start syncing (heads={})\n",
.{ slot, head_count_at_finalization },
);
} else if (got_finalization and !got_node3_sync and slot > first_finalized_slot) {
got_node3_sync = true;
std.debug.print(
"INFO: Advanced finalization at slot {} (first was {}) — chain progressed after node 3 joined\n",
.{ slot, first_finalized_slot },
);
}

if (!got_node3_sync and slot > 0 and e.node_id != null and e.node_id.? == node3_id) {
Expand All @@ -752,12 +768,23 @@ test "SSE events integration test - wait for justification and finalization" {
}
}

std.debug.print("SUCCESS: SSE events integration test completed — including node 3 finalization sync verification\n", .{});

// IMPORTANT: Free the event memory after processing
e.deinit(allocator);
}

if (got_finalization and !got_node3_sync) {
const head_count_now = sse_client.getEventCount("new_head");
// CI often records ~25 heads by first finalization (node3 start) and only
// one more before the chain stalls; require strictly more, not a +N margin.
if (head_count_now > head_count_at_finalization) {
got_node3_sync = true;
std.debug.print(
"INFO: Head events progressed after node 3 join ({} -> {})\n",
.{ head_count_at_finalization, head_count_now },
);
}
}

current_ns = zeam_utils.monotonicTimestampNs();
std.debug.print("CURRENT TIME:{d} DEADLINE={d} START={d} PASSED={d} TIMEOUT={d} (in ms)\n", .{
@divTrunc(current_ns, std.time.ns_per_ms),
Expand All @@ -783,6 +810,15 @@ test "SSE events integration test - wait for justification and finalization" {

std.debug.print("INFO: Received events - Head: {}, Justification: {}, Finalization: {}\n", .{ head_events, justification_events, finalization_events });

// Last-chance: the extra head may land after the deadline loop exits.
if (got_finalization and !got_node3_sync and head_events > head_count_at_finalization) {
got_node3_sync = true;
std.debug.print(
"INFO: Head events progressed after node 3 join ({} -> {}) at timeout\n",
.{ head_count_at_finalization, head_events },
);
}

// Require justification, finalization, and node3 sync verification
try std.testing.expect(got_justification);
try std.testing.expect(got_finalization);
Expand Down
44 changes: 44 additions & 0 deletions pkgs/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,50 @@ These series help correlate **long `[clock]` `slot_interval` gaps** with wall ti
- **Labels**: None
- **Sample Collection Event**: On each invocation of `compactAttestations`

### Block proposal attestation build metrics (`build_block` / `getProposalAttestations`)

`lean_block_building_payload_aggregation_time_seconds` remains the cross-client wall-clock total for the whole call. The metrics below attribute time and counts inside block-proposal attestation selection.

These are **not** the same as `zeam_compact_attestations_*`: those measure only the `compactAttestations` FFI helper (wall time plus attestation row counts in/out per call). They do not cover greedy payload selection, STF simulation, builds completed, child payloads consumed, or final distinct `AttestationData` / aggregate counts. The `compact` phase here overlaps in time with `zeam_compact_attestations_time_seconds` when compaction runs, but the `lean_block_proposal_*` suite is the portable, spec-aligned surface.

#### `lean_block_proposal_attestation_build_phase_seconds` (HistogramVec)
- **Description**: Phase-level time inside proposal attestation selection.
- **Type**: Histogram
- **Unit**: Seconds
- **Buckets**: 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 4, 8
- **Labels**: `phase` — `select_payloads` (greedy child-payload pick), `compact` (recursive merge per AttestationData; zeam uses `compactAttestations`), `stf_simulate` (candidate block STF)
- **Sample Collection Event**: Once per outer-loop iteration, per phase

#### `lean_block_proposal_attestation_builds_total` (Counter)
- **Description**: Completed block-proposal attestation selection runs (one per proposal attempt).
- **Type**: Counter
- **Unit**: Count (u64)
- **Labels**: None
- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked`

#### `lean_block_proposal_child_payloads_consumed_total` (Counter)
- **Description**: Child aggregated payloads cloned from `latest_known_aggregated_payloads` during greedy selection (before compaction).
- **Type**: Counter
- **Unit**: Count (u64)
- **Labels**: None
- **Sample Collection Event**: Summed over one proposal build

#### `lean_block_proposal_attestation_data_selected` (Histogram)
- **Description**: Distinct `AttestationData` entries selected for the proposal block body.
- **Type**: Histogram
- **Unit**: Count
- **Buckets**: 0, 1, 2, 4, 8, 16, 32
- **Labels**: None
- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked`

#### `lean_block_proposal_aggregates_selected` (Histogram)
- **Description**: Aggregated signature proofs in the proposal result after compaction.
- **Type**: Histogram
- **Unit**: Count
- **Buckets**: 0, 1, 2, 4, 8, 16, 32, 64, 128
- **Labels**: None
- **Sample Collection Event**: On successful return from `getProposalAttestationsUnlocked`


## Usage

Expand Down
41 changes: 41 additions & 0 deletions pkgs/metrics/src/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ const Metrics = struct {
zeam_compact_attestations_time_seconds: CompactAttestationsTimeHistogram,
zeam_compact_attestations_input_total: CompactAttestationsInputCounter,
zeam_compact_attestations_output_total: CompactAttestationsOutputCounter,
// Block-proposal attestation selection (`getProposalAttestations`). Phase
// attribution mirrors `zeam_pq_sig_aggregated_signatures_building_phase_seconds`
// on the interval-2 aggregator path. `lean_block_building_payload_aggregation_time_seconds`
// remains the cross-client wall-clock total for the whole call.
lean_block_proposal_attestation_build_phase_seconds: BlockProposalAttestationBuildPhaseHistogram,
lean_block_proposal_attestation_builds_total: BlockProposalAttestationBuildsTotalCounter,
lean_block_proposal_child_payloads_consumed_total: BlockProposalChildPayloadsConsumedTotalCounter,
lean_block_proposal_attestation_data_selected: BlockProposalAttestationDataSelectedHistogram,
lean_block_proposal_aggregates_selected: BlockProposalAggregatesSelectedHistogram,
// Tick interval duration: actual elapsed time between clock ticks (nominal 0.8s)
lean_tick_interval_duration_seconds: TickIntervalDurationHistogram,
/// Wall time for one `xev.Loop.run(.until_done)` in `Clock.run` (issues #863, #867).
Expand Down Expand Up @@ -489,6 +498,11 @@ const Metrics = struct {
const AggregationIntervalTickHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0 });
const CompactAttestationsInputCounter = metrics_lib.Counter(u64);
const CompactAttestationsOutputCounter = metrics_lib.Counter(u64);
const BlockProposalAttestationBuildPhaseHistogram = metrics_lib.HistogramVec(f32, struct { phase: []const u8 }, &[_]f32{ 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 4, 8 });
const BlockProposalAttestationBuildsTotalCounter = metrics_lib.Counter(u64);
const BlockProposalChildPayloadsConsumedTotalCounter = metrics_lib.Counter(u64);
const BlockProposalAttestationDataSelectedHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0, 1, 2, 4, 8, 16, 32 });
const BlockProposalAggregatesSelectedHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0, 1, 2, 4, 8, 16, 32, 64, 128 });
// BeamNode mutex contention histogram types. Buckets span 100us..2s to cover
// both fast acquisitions and long stalls observed when STF runs under the lock.
const NodeMutexLabel = struct { site: []const u8 };
Expand Down Expand Up @@ -682,6 +696,18 @@ fn observeAttestationProduction(ctx: ?*anyopaque, value: f32) void {
histogram.observe(value);
}

fn observeBlockProposalAttestationDataSelected(ctx: ?*anyopaque, value: f32) void {
const histogram_ptr = ctx orelse return;
const histogram: *Metrics.BlockProposalAttestationDataSelectedHistogram = @ptrCast(@alignCast(histogram_ptr));
histogram.observe(value);
}

fn observeBlockProposalAggregatesSelected(ctx: ?*anyopaque, value: f32) void {
const histogram_ptr = ctx orelse return;
const histogram: *Metrics.BlockProposalAggregatesSelectedHistogram = @ptrCast(@alignCast(histogram_ptr));
histogram.observe(value);
}

fn observeCompactAttestations(ctx: ?*anyopaque, value: f32) void {
const histogram_ptr = ctx orelse return;
const histogram: *Metrics.CompactAttestationsTimeHistogram = @ptrCast(@alignCast(histogram_ptr));
Expand Down Expand Up @@ -808,6 +834,14 @@ pub var zeam_compact_attestations_time_seconds: Histogram = .{
.context = null,
.observe = &observeCompactAttestations,
};
pub var lean_block_proposal_attestation_data_selected: Histogram = .{
.context = null,
.observe = &observeBlockProposalAttestationDataSelected,
};
pub var lean_block_proposal_aggregates_selected: Histogram = .{
.context = null,
.observe = &observeBlockProposalAggregatesSelected,
};
pub var lean_tick_interval_duration_seconds: Histogram = .{
.context = null,
.observe = &observeTickIntervalDuration,
Expand Down Expand Up @@ -925,6 +959,11 @@ pub fn init(allocator: std.mem.Allocator) !void {
.zeam_compact_attestations_time_seconds = Metrics.CompactAttestationsTimeHistogram.init("zeam_compact_attestations_time_seconds", .{ .help = "Time taken by compactAttestations to merge payloads sharing the same AttestationData" }, .{}),
.zeam_compact_attestations_input_total = Metrics.CompactAttestationsInputCounter.init("zeam_compact_attestations_input_total", .{ .help = "Total number of attestations input to compactAttestations" }, .{}),
.zeam_compact_attestations_output_total = Metrics.CompactAttestationsOutputCounter.init("zeam_compact_attestations_output_total", .{ .help = "Total number of attestations output from compactAttestations after compaction" }, .{}),
.lean_block_proposal_attestation_build_phase_seconds = try Metrics.BlockProposalAttestationBuildPhaseHistogram.init(allocator, io, "lean_block_proposal_attestation_build_phase_seconds", .{ .help = "Phase-level time in block-proposal attestation selection (build_block / getProposalAttestations): select_payloads, compact (recursive merge per AttestationData), stf_simulate." }, .{}),
.lean_block_proposal_attestation_builds_total = Metrics.BlockProposalAttestationBuildsTotalCounter.init("lean_block_proposal_attestation_builds_total", .{ .help = "Completed block-proposal attestation selection runs (one per proposal attempt)." }, .{}),
.lean_block_proposal_child_payloads_consumed_total = Metrics.BlockProposalChildPayloadsConsumedTotalCounter.init("lean_block_proposal_child_payloads_consumed_total", .{ .help = "Child aggregated payloads selected during greedy proof picking (before recursive compaction)." }, .{}),
.lean_block_proposal_attestation_data_selected = Metrics.BlockProposalAttestationDataSelectedHistogram.init("lean_block_proposal_attestation_data_selected", .{ .help = "Distinct AttestationData entries in the proposal block body." }, .{}),
.lean_block_proposal_aggregates_selected = Metrics.BlockProposalAggregatesSelectedHistogram.init("lean_block_proposal_aggregates_selected", .{ .help = "Aggregated signature proofs in the proposal result after compaction." }, .{}),
.lean_tick_interval_duration_seconds = Metrics.TickIntervalDurationHistogram.init("lean_tick_interval_duration_seconds", .{ .help = "Elapsed time between clock ticks in seconds (nominal 0.8s = 4s slot / 5 intervals)" }, .{}),
.zeam_xev_clock_until_done_drain_seconds = Metrics.XevClockUntilDoneDrainHistogram.init("zeam_xev_clock_until_done_drain_seconds", .{ .help = "Wall time in seconds for one xev run(.until_done) in the clock driver (issues #863, #867). Captures completion backlog before the next tickInterval()." }, .{}),
.zeam_xev_clock_until_done_slow_ge_500ms_total = Metrics.ZeamXevClockUntilDoneSlowGe500msCounter.init("zeam_xev_clock_until_done_slow_ge_500ms_total", .{ .help = "Clock-loop xev run(.until_done) drains with wall time >= 0.5s (#863)." }, .{}),
Expand Down Expand Up @@ -1021,6 +1060,8 @@ pub fn init(allocator: std.mem.Allocator) !void {
lean_gossip_aggregation_size_bytes.context = @ptrCast(&metrics.lean_gossip_aggregation_size_bytes);
lean_attestations_production_time_seconds.context = @ptrCast(&metrics.lean_attestations_production_time_seconds);
zeam_compact_attestations_time_seconds.context = @ptrCast(&metrics.zeam_compact_attestations_time_seconds);
lean_block_proposal_attestation_data_selected.context = @ptrCast(&metrics.lean_block_proposal_attestation_data_selected);
lean_block_proposal_aggregates_selected.context = @ptrCast(&metrics.lean_block_proposal_aggregates_selected);
lean_tick_interval_duration_seconds.context = @ptrCast(&metrics.lean_tick_interval_duration_seconds);
zeam_xev_clock_until_done_drain_seconds.context = @ptrCast(&metrics.zeam_xev_clock_until_done_drain_seconds);
zeam_fork_choice_tick_interval_duration_seconds.context = @ptrCast(&metrics.zeam_fork_choice_tick_interval_duration_seconds);
Expand Down
Loading
Loading