Skip to content

Commit 7b4f50c

Browse files
johannbothaclaude
andauthored
Fix missing consumer_group and topic tags in DSM Kafka 3.8+ metrics (#10464)
The kafka-clients-3.8 instrumentation was calling Optional.get() without checking isPresent(), causing NoSuchElementException when consumer group is null. This resulted in missing consumer_group and topic tags in data_streams.kafka.lag_seconds metrics. Changes: - Replace .get() with .orElse(null) in DDOffsetCommitCallback and ConsumerCoordinatorAdvice to safely handle empty Optionals - Fix incorrect null checks in ConstructorAdvice and LegacyConstructorAdvice to use isPresent() instead of checking if the Optional object itself is null Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a59aaba commit 7b4f50c

5 files changed

Lines changed: 143 additions & 6 deletions

File tree

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public static void captureGroup(
4444
}
4545
// new - searching context for ConsumerDelegate and OffsetCommitCallbackInvoker instead of
4646
// ConsumerCoordinator and KafkaConsumer
47-
if (kafkaConsumerInfo.getConsumerGroup() != null || kafkaConsumerInfo.getmetadata() != null) {
47+
if (kafkaConsumerInfo.getConsumerGroup().isPresent()
48+
|| kafkaConsumerInfo.getmetadata().isPresent()) {
4849
InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class)
4950
.put(consumer, kafkaConsumerInfo);
5051
}

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public static void trackCommitOffset(
3232
return;
3333
}
3434

35-
String consumerGroup = kafkaConsumerInfo.getConsumerGroup().get();
36-
Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().get();
35+
String consumerGroup = kafkaConsumerInfo.getConsumerGroup().orElse(null);
36+
Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().orElse(null);
3737
String clusterId = null;
3838
if (consumerMetadata != null) {
3939
clusterId = InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/DDOffsetCommitCallback.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)
3232
String clusterId = null;
3333

3434
if (kafkaConsumerInfo != null) {
35-
consumerGroup = kafkaConsumerInfo.getConsumerGroup().get();
36-
Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().get();
35+
consumerGroup = kafkaConsumerInfo.getConsumerGroup().orElse(null);
36+
Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().orElse(null);
3737
if (consumerMetadata != null) {
3838
clusterId =
3939
InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public static void captureGroup(
3636
KafkaConsumerInfo kafkaConsumerInfo;
3737
kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, metadata, bootstrapServers);
3838
// new - search for the ConsumerDelegate instead of KafkaConsumer
39-
if (kafkaConsumerInfo.getConsumerGroup() != null || kafkaConsumerInfo.getmetadata() != null) {
39+
if (kafkaConsumerInfo.getConsumerGroup().isPresent()
40+
|| kafkaConsumerInfo.getmetadata().isPresent()) {
4041
InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class)
4142
.put(consumer, kafkaConsumerInfo);
4243
if (coordinator != null) {
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import datadog.trace.agent.test.naming.VersionedNamingTestBase
2+
import datadog.trace.instrumentation.kafka_clients38.DDOffsetCommitCallback
3+
import datadog.trace.instrumentation.kafka_clients38.KafkaConsumerInfo
4+
import org.apache.kafka.clients.consumer.OffsetAndMetadata
5+
import org.apache.kafka.clients.consumer.OffsetCommitCallback
6+
import org.apache.kafka.common.TopicPartition
7+
8+
/**
9+
* Unit test to verify that DSM backlog tracking works even when consumer group is null.
10+
*
11+
* This test reproduces a bug where calling Optional.get() on an empty Optional
12+
* (when consumer group is null) throws NoSuchElementException, causing the entire
13+
* backlog tracking to fail silently.
14+
*
15+
* Before the fix:
16+
* - KafkaConsumerInfo.getConsumerGroup() returns Optional.empty() when consumer group is null
17+
* - DDOffsetCommitCallback calls .get() on this empty Optional
18+
* - NoSuchElementException is thrown
19+
* - No backlog metrics are emitted
20+
*
21+
* After the fix (using .orElse(null) instead of .get()):
22+
* - Backlog metrics are emitted even with null consumer group
23+
*/
24+
class DDOffsetCommitCallbackNullGroupForkedTest extends VersionedNamingTestBase {
25+
26+
@Override
27+
int version() {
28+
return 0
29+
}
30+
31+
@Override
32+
String operation() {
33+
return null
34+
}
35+
36+
@Override
37+
String service() {
38+
return "test-service"
39+
}
40+
41+
@Override
42+
protected boolean isDataStreamsEnabled() {
43+
return true
44+
}
45+
46+
def "test DDOffsetCommitCallback handles null consumer group without exception"() {
47+
setup:
48+
// Create KafkaConsumerInfo with NULL consumer group - this is the key to reproducing the bug
49+
def kafkaConsumerInfo = new KafkaConsumerInfo(null, null, "localhost:9092")
50+
51+
// Verify the consumer group is indeed empty
52+
assert !kafkaConsumerInfo.getConsumerGroup().isPresent() : "Consumer group should be empty for this test"
53+
54+
def callbackInvoked = false
55+
def innerCallback = new OffsetCommitCallback() {
56+
@Override
57+
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
58+
callbackInvoked = true
59+
}
60+
}
61+
62+
def ddCallback = new DDOffsetCommitCallback(innerCallback, kafkaConsumerInfo)
63+
64+
def offsets = [
65+
(new TopicPartition("test-topic", 0)): new OffsetAndMetadata(100L)
66+
]
67+
68+
when:
69+
// This should NOT throw an exception
70+
// Before the fix: NoSuchElementException would be thrown here
71+
// After the fix: completes successfully with null consumer group
72+
ddCallback.onComplete(offsets, null)
73+
74+
then:
75+
// Inner callback should have been invoked
76+
callbackInvoked
77+
78+
// No exception was thrown (implicit - test would fail otherwise)
79+
noExceptionThrown()
80+
}
81+
82+
def "test DDOffsetCommitCallback handles null KafkaConsumerInfo"() {
83+
setup:
84+
def callbackInvoked = false
85+
def innerCallback = new OffsetCommitCallback() {
86+
@Override
87+
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
88+
callbackInvoked = true
89+
}
90+
}
91+
92+
// KafkaConsumerInfo is completely null
93+
def ddCallback = new DDOffsetCommitCallback(innerCallback, null)
94+
95+
def offsets = [
96+
(new TopicPartition("test-topic", 0)): new OffsetAndMetadata(100L)
97+
]
98+
99+
when:
100+
ddCallback.onComplete(offsets, null)
101+
102+
then:
103+
callbackInvoked
104+
noExceptionThrown()
105+
}
106+
107+
def "test DDOffsetCommitCallback with valid consumer group still works"() {
108+
setup:
109+
def kafkaConsumerInfo = new KafkaConsumerInfo("test-group", null, "localhost:9092")
110+
111+
assert kafkaConsumerInfo.getConsumerGroup().isPresent()
112+
assert kafkaConsumerInfo.getConsumerGroup().get() == "test-group"
113+
114+
def callbackInvoked = false
115+
def innerCallback = new OffsetCommitCallback() {
116+
@Override
117+
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
118+
callbackInvoked = true
119+
}
120+
}
121+
122+
def ddCallback = new DDOffsetCommitCallback(innerCallback, kafkaConsumerInfo)
123+
124+
def offsets = [
125+
(new TopicPartition("test-topic", 0)): new OffsetAndMetadata(100L)
126+
]
127+
128+
when:
129+
ddCallback.onComplete(offsets, null)
130+
131+
then:
132+
callbackInvoked
133+
noExceptionThrown()
134+
}
135+
}

0 commit comments

Comments
 (0)