Kafka Integration
Learn how to trace Kafka queue operations with Sentry.
Sentry's Kafka integration lets you trace both production and consumption. In Spring Boot, this happens automatically. If you're using raw kafka-clients, you'll need to instrument producers and consumers with sentry-kafka.
Once configured, queue spans will appear in Sentry's Queues dashboard.
Kafka queue tracing is available in Sentry Java SDK version 8.41.0 and later.
If you're using Spring Kafka (KafkaTemplate / @KafkaListener), Sentry instruments your producers and consumers automatically. If you're using kafka-clients directly, see the Java Kafka docs for manual instrumentation with sentry-kafka.
Add sentry-kafka alongside your existing Spring Kafka dependency:
implementation 'io.sentry:sentry-kafka:8.43.1'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'io.sentry:sentry-kafka:8.43.1'
implementation 'org.springframework.kafka:spring-kafka'
<dependency>
<groupId>io.sentry</groupId>
<artifactId>sentry-kafka</artifactId>
<version>8.43.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Enable queue tracing in your application properties:
sentry.dsn=___DSN___
sentry.enable-queue-tracing=true
sentry.traces-sample-rate=1.0
sentry.dsn=___DSN___
sentry.enable-queue-tracing=true
sentry.traces-sample-rate=1.0
sentry:
dsn: ___DSN___
enable-queue-tracing: true
traces-sample-rate: 1.0
Now every KafkaTemplate.send(...) call produces a queue.publish span if there's a transaction running, and record-based @KafkaListener methods produce queue.process transactions. Sentry injects propagation headers (sentry-trace, baggage) into outgoing records and continues the trace on the consumer side.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/order")
public String placeOrder() {
// Sentry automatically records a queue.publish span for this send.
kafkaTemplate.send("orders", "order-payload");
return "ok";
}
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/order")
public String placeOrder() {
// Sentry automatically records a queue.publish span for this send.
kafkaTemplate.send("orders", "order-payload");
return "ok";
}
}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
@RestController
class OrderController(private val kafkaTemplate: KafkaTemplate<String, String>) {
@GetMapping("/order")
fun placeOrder(): String {
// Sentry automatically records a queue.publish span for this send.
kafkaTemplate.send("orders", "order-payload")
return "ok"
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void onOrder(ConsumerRecord<String, String> record) {
// Sentry automatically records a queue.process transaction for this method.
processOrder(record.value());
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void onOrder(ConsumerRecord<String, String> record) {
// Sentry automatically records a queue.process transaction for this method.
processOrder(record.value());
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
fun onOrder(record: ConsumerRecord<String, String>) {
// Sentry automatically records a queue.process transaction for this method.
processOrder(record.value())
}
}
Sentry sets messaging.message.retry.count from Spring Kafka's kafka_deliveryAttempt header. Spring Kafka only adds this header when delivery attempt headers are enabled and the listener uses an error handler or after-rollback processor that supports delivery attempts.
The following example uses Spring Kafka's DefaultErrorHandler. If your app already uses retry handling that supports delivery attempts, keep your existing handler and enable the header on the listener container:
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Bean
DefaultErrorHandler kafkaErrorHandler() {
return new DefaultErrorHandler(new FixedBackOff(0L, 1L));
}
@Bean
ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>
kafkaContainerCustomizer() {
return container -> container.getContainerProperties().setDeliveryAttemptHeader(true);
}
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Bean
DefaultErrorHandler kafkaErrorHandler() {
return new DefaultErrorHandler(new FixedBackOff(0L, 1L));
}
@Bean
ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>
kafkaContainerCustomizer() {
return container -> container.getContainerProperties().setDeliveryAttemptHeader(true);
}
import org.springframework.context.annotation.Bean
import org.springframework.kafka.config.ContainerCustomizer
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.FixedBackOff
@Bean
fun kafkaErrorHandler(): DefaultErrorHandler {
return DefaultErrorHandler(FixedBackOff(0L, 1L))
}
@Bean
fun kafkaContainerCustomizer() =
ContainerCustomizer<Any, Any, ConcurrentMessageListenerContainer<Any, Any>> { container ->
container.containerProperties.setDeliveryAttemptHeader(true)
}
Without that header, Sentry does not set messaging.message.retry.count.
| Attribute | Type | Description |
|---|---|---|
messaging.system | string | Always "kafka" |
messaging.destination.name | string | Kafka topic name |
messaging.message.id | string | Value of the messaging.message.id record header, if present |
messaging.message.body.size | int | Serialized value size in bytes |
messaging.message.retry.count | int | Number of previous delivery attempts (from Kafka's kafka_deliveryAttempt header), if present |
messaging.message.receive.latency | int | Time in milliseconds between the producer sending the record and the consumer starting to process it |
- Async listeners not supported.
@KafkaListenermethods that return aCompletableFutureorMono/Fluxare not instrumented correctly; use synchronous listeners. - Batch listeners not supported.
@KafkaListenermethods that consume batches, such asConsumerRecords<?, ?>orList<ConsumerRecord<...>>, are not instrumented yet. - Spring Boot auto-instrumentation is disabled when using Sentry OpenTelemetry integrations.
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").