Skip to content

Commit f2a32a7

Browse files
piochelepiotrclaude
andcommitted
Add Kafka cluster ID, topic, consumer group, and offset tags to spans
Tag producer spans with messaging.kafka.cluster.id, topic, and broker-assigned partition/offset (via callback). Tag consumer spans with messaging.kafka.cluster.id, topic, consumer group, and offset. These tags enable linking from a span directly to a specific Kafka message. Previously the cluster ID was only extracted when Data Streams was enabled; MetadataState is now always initialized on the producer so cluster ID flows through regardless of DSM configuration. tag: ai generated Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9f89a0b commit f2a32a7

12 files changed

Lines changed: 49 additions & 18 deletions

File tree

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3-
import datadog.trace.api.Config;
43
import datadog.trace.bootstrap.ContextStore;
54
import datadog.trace.instrumentation.kafka_common.MetadataState;
65
import org.apache.kafka.clients.Metadata;
@@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1615
public static String extractClusterId(
1716
KafkaConsumerInfo kafkaConsumerInfo,
1817
ContextStore<Metadata, MetadataState> metadataContextStore) {
19-
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
18+
if (kafkaConsumerInfo != null) {
2019
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
2120
if (consumerMetadata != null) {
2221
MetadataState state = metadataContextStore.get(consumerMetadata);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
@@ -117,6 +118,7 @@ public void onConsume(
117118
final AgentSpan span,
118119
final ConsumerRecord record,
119120
String consumerGroup,
121+
String clusterId,
120122
String bootstrapServers) {
121123
if (record != null) {
122124
final String topic = record.topic() == null ? "kafka" : record.topic();
@@ -127,7 +129,9 @@ public void onConsume(
127129
if (consumerGroup != null) {
128130
span.setTag(CONSUMER_GROUP, consumerGroup);
129131
}
130-
132+
if (clusterId != null) {
133+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
134+
}
131135
if (bootstrapServers != null) {
132136
span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
133137
}
@@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) {
152156
}
153157

154158
public void onProduce(
155-
final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) {
159+
final AgentSpan span,
160+
final ProducerRecord record,
161+
final ProducerConfig producerConfig,
162+
final String clusterId) {
156163
if (record != null) {
157164
if (record.partition() != null) {
158165
span.setTag(PARTITION, record.partition());
@@ -163,6 +170,9 @@ public void onProduce(
163170
PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent(
164171
producerConfig, BOOTSTRAP_SERVERS_JOINER));
165172
}
173+
if (clusterId != null) {
174+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
175+
}
166176
final String topic = record.topic() == null ? "kafka" : record.topic();
167177
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
168178
span.setTag(MESSAGING_DESTINATION_NAME, topic);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

33
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
46
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
57

68
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -30,6 +32,10 @@ public KafkaProducerCallback(
3032

3133
@Override
3234
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
35+
if (metadata != null) {
36+
span.setTag(PARTITION, metadata.partition());
37+
span.setTag(OFFSET, metadata.offset());
38+
}
3339
PRODUCER_DECORATE.onError(span, exception);
3440
PRODUCER_DECORATE.beforeFinish(span);
3541
span.finish();

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static AgentScope onEnter(
167167
callbackParentSpan = localActiveSpan;
168168
}
169169
PRODUCER_DECORATE.afterStart(span);
170-
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
170+
PRODUCER_DECORATE.onProduce(span, record, producerConfig, clusterId);
171171

172172
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
173173

@@ -267,10 +267,10 @@ public static class ProducerConstructorAdvice {
267267
public static void captureConfiguration(
268268
@Advice.FieldValue("metadata") Metadata metadata,
269269
@Advice.Argument(0) ProducerConfig producerConfig) {
270+
MetadataState state =
271+
InstrumentationContext.get(Metadata.class, MetadataState.class)
272+
.putIfAbsent(metadata, MetadataState::new);
270273
if (Config.get().isDataStreamsEnabled()) {
271-
MetadataState state =
272-
InstrumentationContext.get(Metadata.class, MetadataState.class)
273-
.putIfAbsent(metadata, MetadataState::new);
274274
KafkaConfigHelper.storePendingProducerConfig(
275275
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
276276
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
142142
span.setTag(InstrumentationTags.TOMBSTONE, true);
143143
}
144144
decorator.afterStart(span);
145-
decorator.onConsume(span, val, group, bootstrapServers);
145+
decorator.onConsume(span, val, group, clusterId, bootstrapServers);
146146
if (InstrumenterConfig.get().isLegacyContextManagerEnabled()) {
147147
activateNext(span);
148148
} else {

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3-
import datadog.trace.api.Config;
43
import datadog.trace.bootstrap.ContextStore;
54
import datadog.trace.instrumentation.kafka_common.MetadataState;
65
import org.apache.kafka.clients.Metadata;
@@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1615
public static String extractClusterId(
1716
KafkaConsumerInfo kafkaConsumerInfo,
1817
ContextStore<Metadata, MetadataState> metadataContextStore) {
19-
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
18+
if (kafkaConsumerInfo != null) {
2019
Metadata metadata = kafkaConsumerInfo.getmetadata().get();
2120
if (metadata != null) {
2221
MetadataState state = metadataContextStore.get(metadata);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
@@ -117,6 +118,7 @@ public void onConsume(
117118
final AgentSpan span,
118119
final ConsumerRecord record,
119120
String consumerGroup,
121+
String clusterId,
120122
String bootstrapServers) {
121123
if (record != null) {
122124
final String topic = record.topic() == null ? "kafka" : record.topic();
@@ -127,7 +129,9 @@ public void onConsume(
127129
if (consumerGroup != null) {
128130
span.setTag(CONSUMER_GROUP, consumerGroup);
129131
}
130-
132+
if (clusterId != null) {
133+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
134+
}
131135
if (bootstrapServers != null) {
132136
span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
133137
}
@@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) {
152156
}
153157

154158
public void onProduce(
155-
final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) {
159+
final AgentSpan span,
160+
final ProducerRecord record,
161+
final ProducerConfig producerConfig,
162+
final String clusterId) {
156163
if (record != null) {
157164
if (record.partition() != null) {
158165
span.setTag(PARTITION, record.partition());
@@ -163,6 +170,9 @@ public void onProduce(
163170
PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent(
164171
producerConfig, BOOTSTRAP_SERVERS_JOINER));
165172
}
173+
if (clusterId != null) {
174+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
175+
}
166176
final String topic = record.topic() == null ? "kafka" : record.topic();
167177
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
168178
span.setTag(MESSAGING_DESTINATION_NAME, topic);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

33
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
46
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.PRODUCER_DECORATE;
57

68
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -29,6 +31,10 @@ public KafkaProducerCallback(
2931

3032
@Override
3133
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
34+
if (metadata != null) {
35+
span.setTag(PARTITION, metadata.partition());
36+
span.setTag(OFFSET, metadata.offset());
37+
}
3238
PRODUCER_DECORATE.onError(span, exception);
3339
PRODUCER_DECORATE.beforeFinish(span);
3440
span.finish();

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static AgentScope onEnter(
5656
callbackParentSpan = localActiveSpan;
5757
}
5858
PRODUCER_DECORATE.afterStart(span);
59-
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
59+
PRODUCER_DECORATE.onProduce(span, record, producerConfig, clusterId);
6060

6161
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
6262

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ public class ProducerConstructorAdvice {
1414
public static void captureConfiguration(
1515
@Advice.FieldValue("metadata") Metadata metadata,
1616
@Advice.Argument(0) ProducerConfig producerConfig) {
17+
MetadataState state =
18+
InstrumentationContext.get(Metadata.class, MetadataState.class)
19+
.putIfAbsent(metadata, MetadataState::new);
1720
if (Config.get().isDataStreamsEnabled()) {
18-
MetadataState state =
19-
InstrumentationContext.get(Metadata.class, MetadataState.class)
20-
.putIfAbsent(metadata, MetadataState::new);
2121
KafkaConfigHelper.storePendingProducerConfig(
2222
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
2323
}

0 commit comments

Comments
 (0)