Skip to content

Commit 6fe5273

Browse files
authored
Merge 1e293c6 into cac8c1f
2 parents cac8c1f + 1e293c6 commit 6fe5273

17 files changed

Lines changed: 771 additions & 487 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
### Fixes
1818

19+
- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338))
1920
- Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328))
2021

2122
## 8.37.1

sentry-kafka/api/sentry-kafka.api

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing {
99
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
1010
}
1111

12-
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
12+
public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer {
1313
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
1414
public static final field TRACE_ORIGIN Ljava/lang/String;
15-
public fun <init> ()V
16-
public fun <init> (Lio/sentry/IScopes;)V
17-
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
15+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;)V
16+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V
17+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V
18+
public fun abortTransaction ()V
19+
public fun beginTransaction ()V
20+
public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid;
1821
public fun close ()V
19-
public fun configure (Ljava/util/Map;)V
20-
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
21-
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
22+
public fun close (Ljava/time/Duration;)V
23+
public fun commitTransaction ()V
24+
public fun flush ()V
25+
public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer;
26+
public fun initTransactions ()V
27+
public fun metrics ()Ljava/util/Map;
28+
public fun partitionsFor (Ljava/lang/String;)Ljava/util/List;
29+
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future;
30+
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future;
31+
public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V
32+
public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V
33+
public fun toString ()Ljava/lang/String;
2234
}
2335

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private void finishTransaction(
241241

242242
private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
243243
final @Nullable String enqueuedTimeStr =
244-
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
244+
headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER);
245245
if (enqueuedTimeStr == null) {
246246
return null;
247247
}
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package io.sentry.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
5+
import io.sentry.IScopes;
6+
import io.sentry.ISpan;
7+
import io.sentry.ScopesAdapter;
8+
import io.sentry.SentryLevel;
9+
import io.sentry.SentryTraceHeader;
10+
import io.sentry.SpanDataConvention;
11+
import io.sentry.SpanOptions;
12+
import io.sentry.SpanStatus;
13+
import io.sentry.util.SpanUtils;
14+
import io.sentry.util.TracingUtils;
15+
import java.nio.charset.StandardCharsets;
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.Future;
21+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
22+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
23+
import org.apache.kafka.clients.producer.Callback;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.clients.producer.RecordMetadata;
27+
import org.apache.kafka.common.Metric;
28+
import org.apache.kafka.common.MetricName;
29+
import org.apache.kafka.common.PartitionInfo;
30+
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.Uuid;
32+
import org.apache.kafka.common.errors.ProducerFencedException;
33+
import org.apache.kafka.common.header.Header;
34+
import org.apache.kafka.common.header.Headers;
35+
import org.jetbrains.annotations.ApiStatus;
36+
import org.jetbrains.annotations.NotNull;
37+
import org.jetbrains.annotations.Nullable;
38+
39+
/**
40+
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
41+
* and to inject Sentry trace propagation headers into the produced record.
42+
*
43+
* <p>Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the
44+
* span open until the send callback fires, so the span reflects the actual broker-ack lifecycle.
45+
*
46+
* <p>For raw Kafka usage:
47+
*
48+
* <pre>{@code
49+
* Producer<String, String> producer =
50+
* new SentryKafkaProducer<>(new KafkaProducer<>(props));
51+
* }</pre>
52+
*
53+
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
54+
* sentry-spring-jakarta} installs this wrapper automatically via {@code
55+
* ProducerFactory.addPostProcessor(...)}.
56+
*/
57+
@ApiStatus.Experimental
58+
public final class SentryKafkaProducer<K, V> implements Producer<K, V> {
59+
60+
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
61+
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
62+
63+
private final @NotNull Producer<K, V> delegate;
64+
private final @NotNull IScopes scopes;
65+
private final @NotNull String traceOrigin;
66+
67+
public SentryKafkaProducer(final @NotNull Producer<K, V> delegate) {
68+
this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN);
69+
}
70+
71+
public SentryKafkaProducer(
72+
final @NotNull Producer<K, V> delegate, final @NotNull IScopes scopes) {
73+
this(delegate, scopes, TRACE_ORIGIN);
74+
}
75+
76+
public SentryKafkaProducer(
77+
final @NotNull Producer<K, V> delegate,
78+
final @NotNull IScopes scopes,
79+
final @NotNull String traceOrigin) {
80+
this.delegate = delegate;
81+
this.scopes = scopes;
82+
this.traceOrigin = traceOrigin;
83+
}
84+
85+
/** Returns the wrapped producer. */
86+
public @NotNull Producer<K, V> getDelegate() {
87+
return delegate;
88+
}
89+
90+
@Override
91+
public @NotNull Future<RecordMetadata> send(final @NotNull ProducerRecord<K, V> record) {
92+
return send(record, null);
93+
}
94+
95+
@Override
96+
public @NotNull Future<RecordMetadata> send(
97+
final @NotNull ProducerRecord<K, V> record, final @Nullable Callback callback) {
98+
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
99+
return delegate.send(record, callback);
100+
}
101+
102+
final @Nullable ISpan activeSpan = scopes.getSpan();
103+
if (activeSpan == null || activeSpan.isNoOp()) {
104+
maybeInjectHeaders(record.headers(), null);
105+
return delegate.send(record, callback);
106+
}
107+
108+
final @NotNull SpanOptions spanOptions = new SpanOptions();
109+
spanOptions.setOrigin(traceOrigin);
110+
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
111+
112+
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
113+
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
114+
maybeInjectHeaders(record.headers(), span);
115+
116+
try {
117+
return delegate.send(record, wrapCallback(callback, span));
118+
} catch (Throwable t) {
119+
finishWithError(span, t);
120+
throw t;
121+
}
122+
}
123+
124+
private @NotNull Callback wrapCallback(
125+
final @Nullable Callback userCallback, final @NotNull ISpan span) {
126+
return (metadata, exception) -> {
127+
try {
128+
if (exception != null) {
129+
span.setThrowable(exception);
130+
span.setStatus(SpanStatus.INTERNAL_ERROR);
131+
} else {
132+
span.setStatus(SpanStatus.OK);
133+
}
134+
} catch (Throwable t) {
135+
scopes
136+
.getOptions()
137+
.getLogger()
138+
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
139+
} finally {
140+
span.finish();
141+
if (userCallback != null) {
142+
userCallback.onCompletion(metadata, exception);
143+
}
144+
}
145+
};
146+
}
147+
148+
private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
149+
span.setThrowable(t);
150+
span.setStatus(SpanStatus.INTERNAL_ERROR);
151+
span.finish();
152+
}
153+
154+
private boolean isIgnored() {
155+
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
156+
}
157+
158+
private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) {
159+
try {
160+
final @Nullable List<String> existingBaggageHeaders =
161+
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
162+
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
163+
TracingUtils.trace(scopes, existingBaggageHeaders, span);
164+
if (tracingHeaders != null) {
165+
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
166+
headers.remove(sentryTraceHeader.getName());
167+
headers.add(
168+
sentryTraceHeader.getName(),
169+
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
170+
171+
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
172+
if (baggageHeader != null) {
173+
headers.remove(baggageHeader.getName());
174+
headers.add(
175+
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
176+
}
177+
}
178+
179+
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
180+
headers.add(
181+
SENTRY_ENQUEUED_TIME_HEADER,
182+
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
183+
.toString()
184+
.getBytes(StandardCharsets.UTF_8));
185+
} catch (Throwable t) {
186+
scopes
187+
.getOptions()
188+
.getLogger()
189+
.log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t);
190+
}
191+
}
192+
193+
private static @Nullable List<String> readHeaderValues(
194+
final @NotNull Headers headers, final @NotNull String name) {
195+
@Nullable List<String> values = null;
196+
for (final @NotNull Header header : headers.headers(name)) {
197+
final byte @Nullable [] value = header.value();
198+
if (value != null) {
199+
if (values == null) {
200+
values = new ArrayList<>();
201+
}
202+
values.add(new String(value, StandardCharsets.UTF_8));
203+
}
204+
}
205+
return values;
206+
}
207+
208+
// --- Pure delegation for everything else ---
209+
210+
@Override
211+
public void initTransactions() {
212+
delegate.initTransactions();
213+
}
214+
215+
@Override
216+
public void beginTransaction() throws ProducerFencedException {
217+
delegate.beginTransaction();
218+
}
219+
220+
@Override
221+
@SuppressWarnings("deprecation")
222+
public void sendOffsetsToTransaction(
223+
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
224+
final @NotNull String consumerGroupId)
225+
throws ProducerFencedException {
226+
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
227+
}
228+
229+
@Override
230+
public void sendOffsetsToTransaction(
231+
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
232+
final @NotNull ConsumerGroupMetadata groupMetadata)
233+
throws ProducerFencedException {
234+
delegate.sendOffsetsToTransaction(offsets, groupMetadata);
235+
}
236+
237+
@Override
238+
public void commitTransaction() throws ProducerFencedException {
239+
delegate.commitTransaction();
240+
}
241+
242+
@Override
243+
public void abortTransaction() throws ProducerFencedException {
244+
delegate.abortTransaction();
245+
}
246+
247+
@Override
248+
public void flush() {
249+
delegate.flush();
250+
}
251+
252+
@Override
253+
public @NotNull List<PartitionInfo> partitionsFor(final @NotNull String topic) {
254+
return delegate.partitionsFor(topic);
255+
}
256+
257+
@Override
258+
public @NotNull Map<MetricName, ? extends Metric> metrics() {
259+
return delegate.metrics();
260+
}
261+
262+
@Override
263+
public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) {
264+
return delegate.clientInstanceId(timeout);
265+
}
266+
267+
@Override
268+
public void close() {
269+
delegate.close();
270+
}
271+
272+
@Override
273+
public void close(final @NotNull Duration timeout) {
274+
delegate.close(timeout);
275+
}
276+
277+
@Override
278+
public @NotNull String toString() {
279+
return "SentryKafkaProducer[delegate=" + delegate + "]";
280+
}
281+
}

0 commit comments

Comments
 (0)