Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/queuePopulator.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require('../lib/otel');

const async = require('async');
const schedule = require('node-schedule');

Expand Down
1 change: 1 addition & 0 deletions extensions/gc/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../lib/otel');

const { errors } = require('arsenal');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/bucketProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
33 changes: 32 additions & 1 deletion extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const {
startCircuitBreakerMetricsExport,
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');
const { context: otelContext, trace, SpanKind, SpanStatusCode } =
require('@opentelemetry/api');
const { traceHeadersFromCurrentContext } =
require('../../../lib/tracing/kafkaTraceContext');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
Expand Down Expand Up @@ -309,7 +313,11 @@ class LifecycleConductor {
}

_taskToMessage(task, taskVersion, log) {
return {
// Attach traceparent so the bucket-processor consumer becomes a
// child of the conductor.scan root span. Returns undefined when
// OTEL is disabled (no active span).
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = {
message: JSON.stringify({
action: 'processObjects',
contextInfo: {
Expand All @@ -324,6 +332,8 @@ class LifecycleConductor {
details: {},
}),
};
if (headers) kafkaEntry.headers = headers;
return kafkaEntry;
}

_getAccountIds(unknownCanonicalIds, log, cb) {
Expand Down Expand Up @@ -371,6 +381,27 @@ class LifecycleConductor {
}

processBuckets(cb) {
// Wrap every cron firing in its own root trace so bucket-listing
// and downstream bucket-task produces appear as children. The
// trace is INTERNAL because there's no upstream HTTP/Kafka parent.
const tracer = trace.getTracer('backbeat');
const span = tracer.startSpan('lifecycle.conductor.scan', {
kind: SpanKind.INTERNAL,
});
const ctx = trace.setSpan(otelContext.active(), span);
otelContext.with(ctx, () => {
this._processBucketsInternal((err, res) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
if (cb) cb(err, res);
});
});
}

_processBucketsInternal(cb) {
const log = this.logger.newRequestLogger();
const start = new Date();
let nBucketsQueued = 0;
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/conductor/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/objectProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
22 changes: 20 additions & 2 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const ReplicationAPI = require('../../replication/ReplicationAPI');
const { LifecycleMetrics, LIFECYCLE_MARKER_METRICS_LOCATION } = require('../LifecycleMetrics');
const locationsConfig = require('../../../conf/locationConfig.json') || {};
const { rulesSupportTransition } = require('../util/rules');
const {
linkHeadersFromCurrentContext,
traceHeadersFromCurrentContext,
} = require('../../../lib/tracing/kafkaTraceContext');
const { decode } = versioning.VersionID;

const errorTransitionInProgress = errors.InternalError.
Expand Down Expand Up @@ -121,7 +125,14 @@ class LifecycleTask extends BackbeatTask {
* @return {undefined}
*/
_sendBucketEntry(entry, cb) {
const entries = [{ message: JSON.stringify(entry) }];
// Bucket-task re-queue: keep normal parent-child semantics so the
// continuation stays in the same trace as the originating conductor
// scan. Re-queues happen rarely (only when a bucket is too big to
// finish in one batch) so the trace stays small.
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: JSON.stringify(entry) };
if (headers) kafkaEntry.headers = headers;
const entries = [kafkaEntry];
this.producer.sendToTopic(this.bucketTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'BucketTopic', 'bucket', err, 1);
return cb(err);
Expand Down Expand Up @@ -183,7 +194,14 @@ class LifecycleTask extends BackbeatTask {
location,
Date.now() - entry.getAttribute('transitionTime'));

const entries = [{ message: entry.toKafkaMessage() }];
// Attach link-headers (NOT traceparent). The object-processor
// consumer starts a NEW root trace that links back to this
// bucket-processor span — prevents 1M-object traces from breaking
// the Jaeger UI.
const headers = linkHeadersFromCurrentContext();
const kafkaEntry = { message: entry.toKafkaMessage() };
if (headers) kafkaEntry.headers = headers;
const entries = [kafkaEntry];
this.producer.sendToTopic(this.objectTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'ObjectTopic', 'bucket', err, 1);
return cb(err);
Expand Down
10 changes: 9 additions & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const messageUtil = require('./utils/message');
const notifConstants = require('./constants');
const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class NotificationQueuePopulator extends QueuePopulatorExtension {
/**
Expand Down Expand Up @@ -290,13 +291,20 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
eventTime: message.dateTime,
matchingConfig,
});
// Propagate the oplog entry's trace context as
// Kafka headers so the notification-processor
// consumer becomes a child of the original S3
// trace (same pattern as ReplicationQueuePopulator).
const traceHeaders = traceHeadersFromEntry(value);
this.publish(topic,
// keeping all messages for same object
// in the same partition to keep the order.
// here we use the object name and not the
// "_id" which also includes the versionId
`${bucket}/${message.key}`,
JSON.stringify(message));
JSON.stringify(message),
undefined,
traceHeaders);
// keep track of internal topics we have pushed to
pushedToTopic[topic] = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,22 @@ class KafkaNotificationDestination extends NotificationDestination {
*/
send(messages, done) {
const starTime = Date.now();
this._notificationProducer.send(messages, error => {
// Trust boundary: strip any trace headers before producing to the
// external customer Kafka destination. There's no ingress-layer
// strip on the Kafka protocol (unlike HTTP via nginx), so the only
// place to drop headers is at the producer. Keeps everything else
// on the message intact.
const safeMessages = Array.isArray(messages)
? messages.map(m => {
if (m && m.headers) {
// eslint-disable-next-line no-unused-vars
const { headers, ...rest } = m;
return rest;
}
return m;
})
: messages;
this._notificationProducer.send(safeMessages, error => {
if (error) {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
Expand Down
2 changes: 2 additions & 0 deletions extensions/notification/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/otel');

const assert = require('assert');
const { errors } = require('arsenal');
const async = require('async');
Expand Down
8 changes: 8 additions & 0 deletions extensions/replication/ReplicationAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const locations = require('../../conf/locationConfig.json') || {};

const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');
const { linkHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');

let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
Expand Down Expand Up @@ -78,6 +79,13 @@ class ReplicationAPI {
key: `${bucket}/${key}`,
message: action.toKafkaMessage(),
};
// Fan-out transitions can produce many entries per bucket-processor
// span. Use link-headers so the data-mover consumer starts a new
// root trace rather than a child of this one. If no active OTEL
// span (OTEL disabled), linkHeadersFromCurrentContext returns
// undefined and we ship the entry without headers.
const linkHeaders = linkHeadersFromCurrentContext();
if (linkHeaders) kafkaEntry.headers = linkHeaders;
let topic = dataMoverTopic;
const toLocation = action.getAttribute('toLocation');
const locationConfig = locations[toLocation];
Expand Down
7 changes: 6 additions & 1 deletion extensions/replication/ReplicationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const locationsConfig = require('../../conf/locationConfig.json') || {};
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class ReplicationQueuePopulator extends QueuePopulatorExtension {
constructor(params) {
Expand Down Expand Up @@ -95,11 +96,15 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;

const traceHeaders = traceHeadersFromEntry(value);

this.log.trace('publishing object replication entry',
{ entry: queueEntry.getLogInfo() });
this.publish(this.repConfig.topic,
`${queueEntry.getBucket()}/${queueEntry.getObjectKey()}`,
JSON.stringify(publishedEntry));
JSON.stringify(publishedEntry),
undefined,
traceHeaders);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const assert = require('assert');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/replication/replicationStatusProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const werelogs = require('werelogs');

Expand Down
37 changes: 36 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ const {
}
} = require('./constants');

const { context: otelContext, SpanStatusCode } = require('@opentelemetry/api');
const {
startSpanFromKafkaEntry,
startLinkedSpanFromKafkaEntry,
} = require('./tracing/kafkaTraceContext');

/**
* Detect whether a Kafka entry carries link-traceparent (fan-out break)
* rather than standard traceparent. Link semantics mean the consumer
* starts a NEW root trace that references the upstream via OTEL Link,
* instead of becoming a child of it.
*/
function entryHasLinkHeaders(entry) {
if (!entry || !Array.isArray(entry.headers)) return false;
for (const h of entry.headers) {
if (h['link-traceparent']) return true;
}
return false;
}

const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

Expand Down Expand Up @@ -514,7 +534,22 @@ class BackbeatConsumer extends EventEmitter {
const { topic, partition } = entry;
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);

this._queueProcessor(entry, (err, completionArgs) => done(err, completionArgs, finishProcessingTask));
const { ctx, span } = entryHasLinkHeaders(entry)
? startLinkedSpanFromKafkaEntry(entry, `${topic}.process`)
: startSpanFromKafkaEntry(entry, `${topic}.process`);
span.setAttribute('messaging.kafka.topic', topic);
span.setAttribute('messaging.kafka.partition', partition);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding messaging.kafka.consumer_group — it helps distinguish spans from different consumer groups in Jaeger (e.g. replication vs lifecycle processors running in the same cluster).

```suggestion
span.setAttribute('messaging.kafka.topic', topic);
span.setAttribute('messaging.kafka.partition', partition);
span.setAttribute('messaging.kafka.consumer_group', this._groupId);

otelContext.with(ctx, () => {
this._queueProcessor(entry, (err, completionArgs) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordException only adds an event to the span — it does not set the span status. Without calling span.setStatus({ code: SpanStatusCode.ERROR }), failed spans will still appear as OK in Jaeger. Add span.setStatus({ code: 2 }) (SpanStatusCode.ERROR) alongside recordException. — Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordException adds an event to the span but does not mark the span as failed. Without setStatus, error spans will appear as successful in Jaeger/Grafana Tempo. Add span.setStatus({ code: 2, message: err.message }) alongside recordException.

Suggested change
span.end();
if (err) {
span.recordException(err);
span.setStatus({ code: 2, message: err.message });
}

— Claude Code

done(err, completionArgs, finishProcessingTask);
});
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _queueProcessor throws synchronously, span.end() is never called and the span leaks. Wrap the call in try/catch to ensure cleanup:

suggestion<br> otelContext.with(ctx, () => {<br> try {<br> this._queueProcessor(entry, (err, completionArgs) => {<br> if (err) {<br> span.recordException(err);<br> span.setStatus({ code: SpanStatusCode.ERROR });<br> }<br> span.end();<br> done(err, completionArgs, finishProcessingTask);<br> });<br> } catch (err) {<br> span.recordException(err);<br> span.setStatus({ code: SpanStatusCode.ERROR });<br> span.end();<br> throw err;<br> }<br>

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _queueProcessor throws synchronously (e.g. TypeError from a null entry field), the span created at line 537 is never ended — it leaks. Wrap the _queueProcessor call in try/catch to ensure span.end() is always called. Same pattern applies to LifecycleConductor.processBuckets (line 393).

— Claude Code

}

/**
Expand Down
3 changes: 2 additions & 1 deletion lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class BackbeatProducer extends EventEmitter {
Buffer.from(item.message), // value
item.key, // key (for keyed partitioning)
Date.now(), // timestamp
sendCtx // opaque
sendCtx, // opaque
item.headers || undefined // Kafka message headers
);
});
} catch (err) {
Expand Down
Loading
Loading