From 4d04de0b088b7c168ac4f1e64f1e2de7297bf28d Mon Sep 17 00:00:00 2001 From: Alan Lau Date: Mon, 1 Jun 2026 19:32:33 -0400 Subject: [PATCH 1/2] KAFKA-14597: record-e2e-latency-max only considers consumption latencies but not processing delays --- .../internals/ProcessorContextImpl.java | 2 +- .../processor/internals/StreamTask.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index a0ca68753107a..b5fc1baaf8c8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -293,7 +293,7 @@ private void forwardInternal(final ProcessorNode child, child.process(record); if (child.isTerminalNode()) { - streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name()); + streamTask.maybeRecordE2ELatency(record.timestamp(), child.name()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3b65e677674d6..986608d7a9796 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -98,6 +98,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private long processTimeMs = 0L; + // Cached wall-clock time for terminal-node e2e latency; -1 means not yet read for the current record. + private long currentRecordE2ELatencyTimeMs = -1L; + private final Sensor closeTaskSensor; private final Sensor processRatioSensor; private final Sensor processLatencySensor; @@ -1021,6 +1024,7 @@ private void updateProcessorContext(final ProcessorNode currNode, processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(currNode); processorContext.setSystemTimeMs(wallClockTime); + currentRecordE2ELatencyTimeMs = -1L; } /** @@ -1289,6 +1293,22 @@ void maybeRecordE2ELatency(final long recordTimestamp, final long now, final Str } } + /** + * Records e2e latency for a terminal node using the wall-clock time at which the record was + * fully processed by the topology. + */ + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + if (currentRecordE2ELatencyTimeMs < 0) { + currentRecordE2ELatencyTimeMs = time.milliseconds(); + } + e2eLatencySensor.record(currentRecordE2ELatencyTimeMs - recordTimestamp, currentRecordE2ELatencyTimeMs); + } + } + /** * Request committing the current task's state */ From e3ace21eb355a6fe28fc56a0018551ba5500e9d6 Mon Sep 17 00:00:00 2001 From: Alan Lau Date: Tue, 2 Jun 2026 09:45:19 -0400 Subject: [PATCH 2/2] Update tests --- .../processor/internals/StreamTaskTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 48639fad16ac2..ffea896040c57 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -817,6 +817,7 @@ public void process(final Record record) { final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName); // e2e latency = 10 + time.sleep(10L); task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L))); task.process(10L); @@ -845,6 +846,7 @@ public void process(final Record record) { // e2e latency = 23 + time.sleep(13L); task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(2, 0L))); task.process(23L); @@ -872,6 +874,53 @@ public void process(final Record record) { assertThat(terminalMax.metricValue(), equalTo(23.0)); } + @Test + public void shouldIncludeProcessingDelayInTerminalNodeE2ELatency() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + time = new MockTime(0L, 0L, 0L); + metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); + + final long processingDelay = 5L; + + // A source node that simulates processing work by advancing wall-clock time before forwarding + final MockSourceNode delayingSourceNode = new MockSourceNode<>(intDeserializer, intDeserializer) { + InternalProcessorContext context; + + @Override + public void init(final InternalProcessorContext context) { + this.context = context; + super.init(context); + } + + @Override + public void process(final Record record) { + time.sleep(processingDelay); + context.forward(record); + } + }; + + task = createStatelessTaskWithForwardingTopology(delayingSourceNode); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + final String sourceNodeName = delayingSourceNode.name(); + final String terminalNodeName = processorStreamTime.name(); + + final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName); + final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName); + + // record (timestamp 0) is observed by the source node at wall-clock time 10 + time.sleep(10L); + task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L))); + task.process(time.milliseconds()); + + // source node measures only consumption latency (record timestamp -> source observed it) + assertThat(sourceMax.metricValue(), equalTo(10.0)); + // terminal node measures full end-to-end latency, including the processing delay + assertThat(terminalMax.metricValue(), equalTo(10.0 + processingDelay)); + } + @Test public void shouldRecordRestoredRecords() { when(stateManager.taskId()).thenReturn(taskId);