Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
child.process(record);

if (child.isTerminalNode()) {
streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
streamTask.maybeRecordE2ELatency(record.timestamp(), child.name());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,

private long processTimeMs = 0L;

// Cached wall-clock time for terminal-node e2e latency; -1 means not yet read for the current record.
private long currentRecordE2ELatencyTimeMs = -1L;

private final Sensor closeTaskSensor;
private final Sensor processRatioSensor;
private final Sensor processLatencySensor;
Expand Down Expand Up @@ -1021,6 +1024,7 @@ private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(currNode);
processorContext.setSystemTimeMs(wallClockTime);
currentRecordE2ELatencyTimeMs = -1L;
}

/**
Expand Down Expand Up @@ -1289,6 +1293,22 @@ void maybeRecordE2ELatency(final long recordTimestamp, final long now, final Str
}
}

/**
* Records e2e latency for a terminal node using the wall-clock time at which the record was
* fully processed by the topology.
*/
void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
if (e2eLatencySensor == null) {
throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
} else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
if (currentRecordE2ELatencyTimeMs < 0) {
currentRecordE2ELatencyTimeMs = time.milliseconds();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern as on the other PR for this ticket:

#21618 (review)

}
e2eLatencySensor.record(currentRecordE2ELatencyTimeMs - recordTimestamp, currentRecordE2ELatencyTimeMs);
}
}

/**
* Request committing the current task's state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ public void process(final Record<Integer, Integer> record) {
final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName);

// e2e latency = 10
time.sleep(10L);
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
task.process(10L);

Expand Down Expand Up @@ -845,6 +846,7 @@ public void process(final Record<Integer, Integer> record) {


// e2e latency = 23
time.sleep(13L);
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(2, 0L)));
task.process(23L);

Expand Down Expand Up @@ -872,6 +874,53 @@ public void process(final Record<Integer, Integer> record) {
assertThat(terminalMax.metricValue(), equalTo(23.0));
}

@Test
public void shouldIncludeProcessingDelayInTerminalNodeE2ELatency() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
time = new MockTime(0L, 0L, 0L);
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);

final long processingDelay = 5L;

// A source node that simulates processing work by advancing wall-clock time before forwarding
final MockSourceNode<Integer, Integer> delayingSourceNode = new MockSourceNode<>(intDeserializer, intDeserializer) {
InternalProcessorContext<Integer, Integer> context;

@Override
public void init(final InternalProcessorContext<Integer, Integer> context) {
this.context = context;
super.init(context);
}

@Override
public void process(final Record<Integer, Integer> record) {
time.sleep(processingDelay);
context.forward(record);
}
};

task = createStatelessTaskWithForwardingTopology(delayingSourceNode);
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });

final String sourceNodeName = delayingSourceNode.name();
final String terminalNodeName = processorStreamTime.name();

final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName);
final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName);

// record (timestamp 0) is observed by the source node at wall-clock time 10
time.sleep(10L);
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
task.process(time.milliseconds());

// source node measures only consumption latency (record timestamp -> source observed it)
assertThat(sourceMax.metricValue(), equalTo(10.0));
// terminal node measures full end-to-end latency, including the processing delay
assertThat(terminalMax.metricValue(), equalTo(10.0 + processingDelay));
}

@Test
public void shouldRecordRestoredRecords() {
when(stateManager.taskId()).thenReturn(taskId);
Expand Down