Kafka Integration
Learn how to trace Kafka queue operations with Sentry.
Sentry's Kafka integration lets you trace both production and consumption. In Spring Boot, this happens automatically. If you're using raw kafka-clients, you'll need to instrument producers and consumers with sentry-kafka.
Once configured, queue spans will appear in Sentry's Queues dashboard.
Kafka queue tracing is available in Sentry Java SDK version 8.41.0 and later.
For applications using kafka-clients directly (without Spring), use the sentry-kafka module. If you're using Kafka through Spring Boot, use the Spring Boot Kafka docs instead.
implementation 'io.sentry:sentry-kafka:8.43.1'
implementation 'io.sentry:sentry-kafka:8.43.1'
<dependency>
<groupId>io.sentry</groupId>
<artifactId>sentry-kafka</artifactId>
<version>8.43.1</version>
</dependency>
libraryDependencies += "io.sentry" % "sentry-kafka" % "8.43.1"
For other dependency managers, use the same Maven coordinates: io.sentry:sentry-kafka.
Enable queue tracing when initializing Sentry:
Sentry.init(options -> {
options.setDsn("___DSN___");
options.setTracesSampleRate(1.0);
options.setEnableQueueTracing(true);
});
Sentry.init(options -> {
options.setDsn("___DSN___");
options.setTracesSampleRate(1.0);
options.setEnableQueueTracing(true);
});
Sentry.init { options ->
options.dsn = "___DSN___"
options.tracesSampleRate = 1.0
options.isEnableQueueTracing = true
}
Wrap your KafkaProducer with SentryKafkaProducer.wrap(). Every send() call then records a queue.publish span and injects Sentry propagation headers into the record.
import io.sentry.kafka.SentryKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
KafkaProducer<String, String> rawProducer = new KafkaProducer<>(producerProps);
Producer<String, String> producer = SentryKafkaProducer.wrap(rawProducer);
producer.send(new ProducerRecord<>("orders", "order-payload"));
import io.sentry.kafka.SentryKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
KafkaProducer<String, String> rawProducer = new KafkaProducer<>(producerProps);
Producer<String, String> producer = SentryKafkaProducer.wrap(rawProducer);
producer.send(new ProducerRecord<>("orders", "order-payload"));
import io.sentry.kafka.SentryKafkaProducer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
val rawProducer = KafkaProducer<String, String>(producerProps)
val producer = SentryKafkaProducer.wrap(rawProducer)
producer.send(ProducerRecord("orders", "order-payload"))
A queue.publish span is created only when there is an active transaction in scope. Sentry trace headers are always injected (even without an active span) so the consumer can continue the trace.
Wrap each record's processing callback with SentryKafkaConsumerTracing.withTracing(). This creates a queue.process transaction per record, continues the distributed trace from producer headers, and calculates receive latency automatically.
If you're also using OpenTelemetry Kafka instrumentation, don't instrument the same consumer callback with withTracing(). This helper is not automatically suppressed under OpenTelemetry today, so using both can create duplicate queue.process transactions.
import io.sentry.kafka.SentryKafkaConsumerTracing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(List.of("orders"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
SentryKafkaConsumerTracing.withTracing(record, () -> {
processOrder(record.value());
});
}
}
}
import io.sentry.kafka.SentryKafkaConsumerTracing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(List.of("orders"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
SentryKafkaConsumerTracing.withTracing(record, () -> {
processOrder(record.value());
});
}
}
}
import io.sentry.kafka.SentryKafkaConsumerTracing
import org.apache.kafka.clients.consumer.KafkaConsumer
KafkaConsumer<String, String>(consumerProps).use { consumer ->
consumer.subscribe(listOf("orders"))
while (running) {
val records = consumer.poll(Duration.ofMillis(500))
for (record in records) {
SentryKafkaConsumerTracing.withTracing(record) {
processOrder(record.value())
}
}
}
}
Use the Callable overload when your processing code throws checked exceptions:
SentryKafkaConsumerTracing.withTracing(record, () -> {
return processOrder(record.value()); // can throw checked exceptions
});
SentryKafkaConsumerTracing.withTracing(record, () -> {
return processOrder(record.value()); // can throw checked exceptions
});
import java.util.concurrent.Callable
SentryKafkaConsumerTracing.withTracing(
record,
Callable {
processOrder(record.value())
},
)
| Attribute | Type | Description |
|---|---|---|
messaging.system | string | Always "kafka" |
messaging.destination.name | string | Kafka topic name |
messaging.message.id | string | Value of the messaging.message.id record header, if present |
messaging.message.body.size | int | Serialized value size in bytes |
messaging.message.retry.count | int | Number of previous delivery attempts (from Kafka's kafka_deliveryAttempt header), if present |
messaging.message.receive.latency | int | Time in milliseconds between the producer sending the record and the consumer starting to process it |
- Async listeners not supported.
@KafkaListenermethods that return aCompletableFutureorMono/Fluxare not instrumented correctly; use synchronous listeners. - Batch listeners not supported.
@KafkaListenermethods that consume batches, such asConsumerRecords<?, ?>orList<ConsumerRecord<...>>, are not instrumented yet. - Spring Boot auto-instrumentation is disabled when using Sentry OpenTelemetry integrations.
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").