From 6099047681aee67838706730054ff768f057d2a1 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 1 Apr 2026 16:24:52 +0200 Subject: [PATCH 1/4] feat(spring-jakarta): Add Kafka consumer instrumentation Add SentryKafkaRecordInterceptor that creates queue.process transactions for incoming Kafka records. Forks scopes per record, extracts sentry-trace and baggage headers for distributed tracing via continueTrace, and calculates messaging.message.receive.latency from the enqueued-time header. Composes with existing RecordInterceptor via delegation. Span lifecycle is managed through success/failure callbacks. Add SentryKafkaConsumerBeanPostProcessor to register the interceptor on ConcurrentKafkaListenerContainerFactory beans. Co-Authored-By: Claude --- .../api/sentry-spring-jakarta.api | 15 ++ .../SentryKafkaConsumerBeanPostProcessor.java | 61 ++++++ .../kafka/SentryKafkaRecordInterceptor.java | 201 +++++++++++++++++ ...entryKafkaConsumerBeanPostProcessorTest.kt | 58 +++++ .../kafka/SentryKafkaRecordInterceptorTest.kt | 202 ++++++++++++++++++ 5 files changed, 537 insertions(+) create mode 100644 sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java create mode 100644 sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java create mode 100644 sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt create mode 100644 sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index bc95af0859..c5ca7444c0 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -244,6 +244,12 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { public fun ()V public fun getOrder ()I @@ -254,6 +260,15 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : o public fun (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V + public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V +} + public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java new file mode 100644 index 0000000000..0fd52aa6c4 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -0,0 +1,61 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.ScopesAdapter; +import java.lang.reflect.Field; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} + * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. + */ +@ApiStatus.Internal +public final class SentryKafkaConsumerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof AbstractKafkaListenerContainerFactory) { + final @NotNull AbstractKafkaListenerContainerFactory factory = + (AbstractKafkaListenerContainerFactory) bean; + + final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + if (existing instanceof SentryKafkaRecordInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final RecordInterceptor sentryInterceptor = + new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); + factory.setRecordInterceptor(sentryInterceptor); + } + return bean; + } + + @SuppressWarnings("unchecked") + private @Nullable RecordInterceptor getExistingInterceptor( + final @NotNull AbstractKafkaListenerContainerFactory factory) { + try { + final @NotNull Field field = + AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); + field.setAccessible(true); + return (RecordInterceptor) field.get(factory); + } catch (NoSuchFieldException | IllegalAccessException e) { + return null; + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java new file mode 100644 index 0000000000..419e7834a1 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -0,0 +1,201 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka + * records with distributed tracing support. + */ +@ApiStatus.Internal +public final class SentryKafkaRecordInterceptor implements RecordInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer"; + + private final @NotNull IScopes scopes; + private final @Nullable RecordInterceptor delegate; + + private static final @NotNull ThreadLocal currentContext = + new ThreadLocal<>(); + + public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { + this(scopes, null); + } + + public SentryKafkaRecordInterceptor( + final @NotNull IScopes scopes, final @Nullable RecordInterceptor delegate) { + this.scopes = scopes; + this.delegate = delegate; + } + + @Override + public @Nullable ConsumerRecord intercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (!scopes.getOptions().isEnableQueueTracing()) { + return delegateIntercept(record, consumer); + } + + final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); + final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); + + continueTrace(forkedScopes, record); + + final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); + currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); + + return delegateIntercept(record, consumer); + } + + @Override + public void success( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.success(record, consumer); + } + } finally { + finishSpan(SpanStatus.OK, null); + } + } + + @Override + public void failure( + final @NotNull ConsumerRecord record, + final @NotNull Exception exception, + final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.failure(record, exception, consumer); + } + } finally { + finishSpan(SpanStatus.INTERNAL_ERROR, exception); + } + } + + @Override + public void afterRecord( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.afterRecord(record, consumer); + } + } + + private @Nullable ConsumerRecord delegateIntercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + return delegate.intercept(record, consumer); + } + return record; + } + + private void continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = + forkedScopes.startTransaction( + new TransactionContext("queue.process", "queue.process"), txOptions); + + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, "messaging.message.id"); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr != null) { + try { + final long enqueuedTime = Long.parseLong(enqueuedTimeStr); + final long latencyMs = System.currentTimeMillis() - enqueuedTime; + if (latencyMs >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); + } + } catch (NumberFormatException ignored) { + // ignore malformed header + } + } + + return transaction; + } + + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { + final @Nullable SentryRecordContext ctx = currentContext.get(); + if (ctx == null) { + return; + } + currentContext.remove(); + + try { + final @Nullable ITransaction transaction = ctx.transaction; + if (transaction != null) { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } + } finally { + ctx.lifecycleToken.close(); + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + + private static final class SentryRecordContext { + final @NotNull ISentryLifecycleToken lifecycleToken; + final @Nullable ITransaction transaction; + + SentryRecordContext( + final @NotNull ISentryLifecycleToken lifecycleToken, + final @Nullable ITransaction transaction) { + this.lifecycleToken = lifecycleToken; + this.transaction = transaction; + } + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt new file mode 100644 index 0000000000..8595cb9ae7 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -0,0 +1,58 @@ +package io.sentry.spring.jakarta.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.mockito.kotlin.mock +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory + +class SentryKafkaConsumerBeanPostProcessorTest { + + @Test + fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + // Verify via reflection that the interceptor was set + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val interceptor = field.get(factory) + assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + // First wrap + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val firstInterceptor = field.get(factory) + + // Second wrap — should be idempotent + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + val secondInterceptor = field.get(factory) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not wrap non-factory beans`() { + val someBean = "not a factory" + val processor = SentryKafkaConsumerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt new file mode 100644 index 0000000000..9b92f19749 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -0,0 +1,202 @@ +package io.sentry.spring.jakarta.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.TransactionContext +import java.nio.charset.StandardCharsets +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.listener.RecordInterceptor + +class SentryKafkaRecordInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + private lateinit var consumer: Consumer + private lateinit var lifecycleToken: ISentryLifecycleToken + + @BeforeTest + fun setup() { + scopes = mock() + consumer = mock() + lifecycleToken = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + whenever(scopes.options).thenReturn(options) + whenever(scopes.isEnabled).thenReturn(true) + + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + } + + private fun createRecord( + topic: String = "my-topic", + headers: RecordHeaders = RecordHeaders(), + ): ConsumerRecord { + val record = ConsumerRecord(topic, 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + private fun createRecordWithHeaders( + sentryTrace: String? = null, + baggage: String? = null, + enqueuedTime: Long? = null, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER, + it.toString().toByteArray(StandardCharsets.UTF_8), + ) + } + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + @Test + fun `intercept creates forked scopes`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(scopes).forkedScopes("SentryKafkaRecordInterceptor") + } + + @Test + fun `intercept continues trace from headers`() { + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept calls continueTrace with null when no headers`() { + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedScopes(any()) + assertEquals(record, result) + } + + @Test + fun `delegates to existing interceptor`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + interceptor.intercept(record, consumer) + + verify(delegate).intercept(record, consumer) + } + + @Test + fun `success finishes transaction and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + // intercept first to set up context + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + + verify(delegate).success(record, consumer) + } + + @Test + fun `failure finishes transaction with error and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + val exception = RuntimeException("processing failed") + + interceptor.intercept(record, consumer) + interceptor.failure(record, exception, consumer) + + verify(delegate).failure(record, exception, consumer) + } + + @Test + fun `afterRecord delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.afterRecord(record, consumer) + + verify(delegate).afterRecord(record, consumer) + } + + @Test + fun `trace origin is set correctly`() { + assertEquals( + "auto.queue.spring_jakarta.kafka.consumer", + SentryKafkaRecordInterceptor.TRACE_ORIGIN, + ) + } +} From 1f000270e0e66c62e012ced26b81b72309d53478 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 1 Apr 2026 16:31:34 +0200 Subject: [PATCH 2/4] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fc8eb10d9..cb5772b3d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add Kafka consumer instrumentation for Spring Boot 3 ([#5255](https://github.com/getsentry/sentry-java/pull/5255)) - Add Kafka producer instrumentation for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)) - Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) - Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136)) From be3a2ba5ff3d7c58408cb9eefe42c4cf5e5d86d9 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Thu, 9 Apr 2026 13:16:25 +0200 Subject: [PATCH 3/4] fix(spring-jakarta): Update consumer references and add reflection warning log Update SentryKafkaRecordInterceptor and its test to reference SentryProducerInterceptor instead of the removed SentryKafkaProducerWrapper. Add a warning log in SentryKafkaConsumerBeanPostProcessor when reflection fails to read the existing RecordInterceptor, so users know their custom interceptor may not be chained. Co-Authored-By: Claude --- .../kafka/SentryKafkaConsumerBeanPostProcessor.java | 10 ++++++++++ .../jakarta/kafka/SentryKafkaRecordInterceptor.java | 2 +- .../jakarta/kafka/SentryKafkaRecordInterceptorTest.kt | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index 0fd52aa6c4..f272a575cb 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -1,6 +1,7 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; import java.lang.reflect.Field; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -50,6 +51,15 @@ public final class SentryKafkaConsumerBeanPostProcessor field.setAccessible(true); return (RecordInterceptor) field.get(factory); } catch (NoSuchFieldException | IllegalAccessException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Unable to read existing recordInterceptor from " + + "AbstractKafkaListenerContainerFactory via reflection. " + + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", + e); return null; } } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 419e7834a1..11c0301b2e 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -141,7 +141,7 @@ private void continueTrace( } final @Nullable String enqueuedTimeStr = - headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { try { final long enqueuedTime = Long.parseLong(enqueuedTimeStr); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 9b92f19749..a6baf246e5 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -74,7 +74,7 @@ class SentryKafkaRecordInterceptorTest { } enqueuedTime?.let { headers.add( - SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER, + SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, it.toString().toByteArray(StandardCharsets.UTF_8), ) } From f92f47c91c094e1da53ceca68e9bf479c83ce356 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Thu, 9 Apr 2026 14:35:54 +0200 Subject: [PATCH 4/4] fix(spring-jakarta): Initialize Sentry in consumer test, fix API file ordering Add initForTest/close to SentryKafkaRecordInterceptorTest to fix NPE from TransactionContext constructor requiring initialized Sentry. Regenerate API file to fix alphabetical ordering of SentryProducerInterceptor entry. Co-Authored-By: Claude --- .../api/sentry-spring-jakarta.api | 16 ++++++++-------- .../kafka/SentryKafkaRecordInterceptorTest.kt | 9 +++++++++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index 3e84a0a50f..57d46f05bc 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -256,14 +256,6 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostPro public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; } -public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { - public fun (Lio/sentry/IScopes;)V - public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V - public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; -} - public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { public fun (Lio/sentry/IScopes;)V public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V @@ -273,6 +265,14 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V } +public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V + public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; +} + public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index a6baf246e5..370da75585 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -3,11 +3,14 @@ package io.sentry.spring.jakarta.kafka import io.sentry.BaggageHeader import io.sentry.IScopes import io.sentry.ISentryLifecycleToken +import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader import io.sentry.SentryTracer import io.sentry.TransactionContext +import io.sentry.test.initForTest import java.nio.charset.StandardCharsets +import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals @@ -30,6 +33,7 @@ class SentryKafkaRecordInterceptorTest { @BeforeTest fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } scopes = mock() consumer = mock() lifecycleToken = mock() @@ -51,6 +55,11 @@ class SentryKafkaRecordInterceptorTest { whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) } + @AfterTest + fun teardown() { + Sentry.close() + } + private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(),