Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

### Fixes

- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338))
- Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328))

## 8.37.1
Expand Down
26 changes: 19 additions & 7 deletions sentry-kafka/api/sentry-kafka.api
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing {
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
}

public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer {
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> ()V
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V
public fun abortTransaction ()V
public fun beginTransaction ()V
public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid;
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
public fun close (Ljava/time/Duration;)V
public fun commitTransaction ()V
public fun flush ()V
public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer;
public fun initTransactions ()V
public fun metrics ()Ljava/util/Map;
public fun partitionsFor (Ljava/lang/String;)Ljava/util/List;
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future;
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future;
public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V
public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V
public fun toString ()Ljava/lang/String;
}

Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void finishTransaction(

private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String enqueuedTimeStr =
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER);
if (enqueuedTimeStr == null) {
return null;
}
Expand Down
281 changes: 281 additions & 0 deletions sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
import io.sentry.IScopes;
import io.sentry.ISpan;
import io.sentry.ScopesAdapter;
import io.sentry.SentryLevel;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanOptions;
import io.sentry.SpanStatus;
import io.sentry.util.SpanUtils;
import io.sentry.util.TracingUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
* and to inject Sentry trace propagation headers into the produced record.
*
* <p>Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the
* span open until the send callback fires, so the span reflects the actual broker-ack lifecycle.
*
* <p>For raw Kafka usage:
*
* <pre>{@code
* Producer<String, String> producer =
* new SentryKafkaProducer<>(new KafkaProducer<>(props));
* }</pre>
*
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
* sentry-spring-jakarta} installs this wrapper automatically via {@code
* ProducerFactory.addPostProcessor(...)}.
*/
@ApiStatus.Experimental
public final class SentryKafkaProducer<K, V> implements Producer<K, V> {

public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";

private final @NotNull Producer<K, V> delegate;
private final @NotNull IScopes scopes;
private final @NotNull String traceOrigin;

public SentryKafkaProducer(final @NotNull Producer<K, V> delegate) {
this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN);
}

public SentryKafkaProducer(
final @NotNull Producer<K, V> delegate, final @NotNull IScopes scopes) {
this(delegate, scopes, TRACE_ORIGIN);
}

public SentryKafkaProducer(
final @NotNull Producer<K, V> delegate,
final @NotNull IScopes scopes,
final @NotNull String traceOrigin) {
this.delegate = delegate;
this.scopes = scopes;
this.traceOrigin = traceOrigin;
}

/** Returns the wrapped producer. */
public @NotNull Producer<K, V> getDelegate() {
return delegate;
}

@Override
public @NotNull Future<RecordMetadata> send(final @NotNull ProducerRecord<K, V> record) {
return send(record, null);
}

@Override
public @NotNull Future<RecordMetadata> send(
final @NotNull ProducerRecord<K, V> record, final @Nullable Callback callback) {
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
return delegate.send(record, callback);
}

final @Nullable ISpan activeSpan = scopes.getSpan();
if (activeSpan == null || activeSpan.isNoOp()) {
maybeInjectHeaders(record.headers(), null);
return delegate.send(record, callback);
}

final @NotNull SpanOptions spanOptions = new SpanOptions();
spanOptions.setOrigin(traceOrigin);
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);

span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
maybeInjectHeaders(record.headers(), span);

try {
return delegate.send(record, wrapCallback(callback, span));
} catch (Throwable t) {
finishWithError(span, t);
throw t;
}
}

private @NotNull Callback wrapCallback(
final @Nullable Callback userCallback, final @NotNull ISpan span) {
return (metadata, exception) -> {
try {
if (exception != null) {
span.setThrowable(exception);
span.setStatus(SpanStatus.INTERNAL_ERROR);
} else {
span.setStatus(SpanStatus.OK);
}
} catch (Throwable t) {
scopes
.getOptions()
.getLogger()
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
} finally {
span.finish();
if (userCallback != null) {
userCallback.onCompletion(metadata, exception);
}
}
};
}

private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
span.setThrowable(t);
span.setStatus(SpanStatus.INTERNAL_ERROR);
span.finish();
}

private boolean isIgnored() {
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
}

private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) {
try {
final @Nullable List<String> existingBaggageHeaders =
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
TracingUtils.trace(scopes, existingBaggageHeaders, span);
if (tracingHeaders != null) {
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
headers.remove(sentryTraceHeader.getName());
headers.add(
sentryTraceHeader.getName(),
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));

final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
if (baggageHeader != null) {
headers.remove(baggageHeader.getName());
headers.add(
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
}
}

headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
headers.add(
SENTRY_ENQUEUED_TIME_HEADER,
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
.toString()
.getBytes(StandardCharsets.UTF_8));
} catch (Throwable t) {
scopes
.getOptions()
.getLogger()
.log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t);
}
}

private static @Nullable List<String> readHeaderValues(
final @NotNull Headers headers, final @NotNull String name) {
@Nullable List<String> values = null;
for (final @NotNull Header header : headers.headers(name)) {
final byte @Nullable [] value = header.value();
if (value != null) {
if (values == null) {
values = new ArrayList<>();
}
values.add(new String(value, StandardCharsets.UTF_8));
}
}
return values;
}

// --- Pure delegation for everything else ---

@Override
public void initTransactions() {
delegate.initTransactions();
}

@Override
public void beginTransaction() throws ProducerFencedException {
delegate.beginTransaction();
}

@Override
@SuppressWarnings("deprecation")
public void sendOffsetsToTransaction(
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
final @NotNull String consumerGroupId)
throws ProducerFencedException {
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
}

@Override
public void sendOffsetsToTransaction(
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
final @NotNull ConsumerGroupMetadata groupMetadata)
throws ProducerFencedException {
delegate.sendOffsetsToTransaction(offsets, groupMetadata);
}

@Override
public void commitTransaction() throws ProducerFencedException {
delegate.commitTransaction();
}

@Override
public void abortTransaction() throws ProducerFencedException {
delegate.abortTransaction();
}

@Override
public void flush() {
delegate.flush();
}

@Override
public @NotNull List<PartitionInfo> partitionsFor(final @NotNull String topic) {
return delegate.partitionsFor(topic);
}

@Override
public @NotNull Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
}

@Override
public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) {
return delegate.clientInstanceId(timeout);
}

@Override
public void close() {
delegate.close();
}

@Override
public void close(final @NotNull Duration timeout) {
delegate.close(timeout);
}

@Override
public @NotNull String toString() {
return "SentryKafkaProducer[delegate=" + delegate + "]";
}
}
Loading
Loading