Support adding kafka_cluster_id for Confluent.Kafka#7702
Conversation
This comment has been minimized.
This comment has been minimized.
Execution-Time Benchmarks Report ⏱️Execution-time results for samples comparing This PR (7702) and master. ✅ No regressions detected - check the details below Full Metrics ComparisonFakeDbCommand
HttpMessageHandler
Comparison explanationExecution-time benchmarks measure the whole time it takes to execute a program, and are intended to measure the one-off costs. Cases where the execution time results for the PR are worse than latest master results are highlighted in **red**. The following thresholds were used for comparing the execution times:
Note that these results are based on a single point-in-time result for each branch. For full results, see the dashboard. Graphs show the p99 interval based on the mean and StdDev of the test run, as well as the mean value of the run (shown as a diamond below the graph). Duration chartsFakeDbCommand (.NET Framework 4.8)gantt
title Execution time (ms) FakeDbCommand (.NET Framework 4.8)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (69ms) : 67, 71
master - mean (70ms) : 66, 73
section Bailout
This PR (7702) - mean (73ms) : 71, 75
master - mean (73ms) : 72, 74
section CallTarget+Inlining+NGEN
This PR (7702) - mean (1,047ms) : 993, 1102
master - mean (1,051ms) : 995, 1106
FakeDbCommand (.NET Core 3.1)gantt
title Execution time (ms) FakeDbCommand (.NET Core 3.1)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (108ms) : 104, 111
master - mean (107ms) : 104, 111
section Bailout
This PR (7702) - mean (108ms) : 107, 110
master - mean (109ms) : 107, 110
section CallTarget+Inlining+NGEN
This PR (7702) - mean (727ms) : 708, 746
master - mean (730ms) : 711, 750
FakeDbCommand (.NET 6)gantt
title Execution time (ms) FakeDbCommand (.NET 6)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (95ms) : 93, 98
master - mean (96ms) : 93, 99
section Bailout
This PR (7702) - mean (97ms) : 95, 98
master - mean (96ms) : 94, 98
section CallTarget+Inlining+NGEN
This PR (7702) - mean (703ms) : 654, 752
master - mean (709ms) : 668, 751
FakeDbCommand (.NET 8)gantt
title Execution time (ms) FakeDbCommand (.NET 8)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (94ms) : 91, 97
master - mean (94ms) : 92, 97
section Bailout
This PR (7702) - mean (95ms) : 93, 98
master - mean (95ms) : 93, 97
section CallTarget+Inlining+NGEN
This PR (7702) - mean (636ms) : 620, 651
master - mean (641ms) : 615, 667
HttpMessageHandler (.NET Framework 4.8)gantt
title Execution time (ms) HttpMessageHandler (.NET Framework 4.8)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (194ms) : 190, 199
master - mean (196ms) : 191, 201
section Bailout
This PR (7702) - mean (197ms) : 195, 200
master - mean (198ms) : 194, 201
section CallTarget+Inlining+NGEN
This PR (7702) - mean (1,154ms) : 1103, 1205
master - mean (1,148ms) : 1097, 1199
HttpMessageHandler (.NET Core 3.1)gantt
title Execution time (ms) HttpMessageHandler (.NET Core 3.1)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (279ms) : 274, 285
master - mean (277ms) : 273, 282
section Bailout
This PR (7702) - mean (279ms) : 273, 285
master - mean (279ms) : 274, 284
section CallTarget+Inlining+NGEN
This PR (7702) - mean (902ms) : 878, 927
master - mean (903ms) : 876, 929
HttpMessageHandler (.NET 6)gantt
title Execution time (ms) HttpMessageHandler (.NET 6)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (272ms) : 268, 276
master - mean (272ms) : 266, 277
section Bailout
This PR (7702) - mean (273ms) : 269, 277
master - mean (271ms) : 267, 275
section CallTarget+Inlining+NGEN
This PR (7702) - mean (941ms) : 912, 970
master - mean (936ms) : 909, 963
HttpMessageHandler (.NET 8)gantt
title Execution time (ms) HttpMessageHandler (.NET 8)
dateFormat x
axisFormat %Q
todayMarker off
section Baseline
This PR (7702) - mean (271ms) : 263, 279
master - mean (271ms) : 263, 278
section Bailout
This PR (7702) - mean (272ms) : 266, 278
master - mean (271ms) : 266, 275
section CallTarget+Inlining+NGEN
This PR (7702) - mean (843ms) : 814, 873
master - mean (835ms) : 814, 855
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BenchmarksBenchmark execution time: 2026-03-20 14:34:16 Comparing candidate commit d527467 in PR branch Found 5 performance improvements and 10 performance regressions! Performance is the same for 251 metrics, 22 unstable metrics.
|
f83a435 to
3d16b25
Compare
Extract Kafka cluster_id via AdminClient.DescribeClusterAsync using duck typing and add it to span tags, DSM edge tags, and backlog tracking tags. Changes: - Add KafkaHelper.GetClusterId() using duck typed Confluent.Kafka AdminClient API with re-entrancy guard (AdminClient internally creates a Producer) - Extend ConsumerCache/ProducerCache to store cluster_id alongside existing metadata - Add ClusterId property to KafkaTags (tag: messaging.kafka.cluster_id) - Include kafka_cluster_id in DSM edge tags for produce/consume checkpoints - Include kafka_cluster_id in commit/produce backlog tracking tags - Add duck type interfaces: IAdminClient, IAdminClientBuilder, IAdminClientConfig, IDescribeClusterResult with IDuckTypeTask for async handling - Add unit tests for cache classes, KafkaTags, and GetClusterId edge cases Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e3c11d3 to
04181fd
Compare
- Fix DSM edge tags alphabetical sort: direction before kafka_cluster_id - Guard kafka_cluster_id in OffsetsCommittedCallbacks with IsNullOrEmpty check, consistent with all other callsites - Rename misleading test name to HandlesConnectionFailureGracefully Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The span_attribute_schema markdown files had formatting-only changes (table alignment) that were accidentally included from WIP commits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Verify the duck typing chain works for IAdminClientConfig, IAdminClientBuilder, and check DescribeClusterAsync availability (requires Confluent.Kafka 2.0+). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…st coverage Pass the producer instance to TryInjectHeaders so it can query ProducerCache directly for cluster_id, instead of casting span.Tags back to KafkaTags. This matches how the consumer side uses ConsumerCache. Also add cluster_id assertions to integration tests and register the tag as optional in span metadata rules. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tead of sender Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| { | ||
| _isGettingClusterId = true; | ||
|
|
||
| var configType = Type.GetType("Confluent.Kafka.AdminClientConfig, Confluent.Kafka"); |
There was a problem hiding this comment.
afaik this is the only way to get this object, and the builder below
…lusterAsync Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…or cluster_id Instead of creating a standalone AdminClient with a new connection to the broker, use DependentAdminClientBuilder which reuses the producer/consumer's existing connection. Move cluster_id resolution from OnMethodBegin to OnMethodEnd so the client handle is available after construction. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… fallback Remove standalone AdminClient fallback and IAdminClientConfig since DependentAdminClientBuilder is available in all supported Confluent.Kafka versions (1.4.0+). Clean up constructor OnMethodEnd to use early returns. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| x => x.Tags[Tags.KafkaConsumerGroup] == "Samples.Kafka.AutoCommitConsumer1" | ||
| || x.Tags[Tags.KafkaConsumerGroup] == "Samples.Kafka.ManualCommitConsumer2"); | ||
|
|
||
| // cluster_id is only available with Confluent.Kafka 2.0+ (DescribeClusterAsync) |
There was a problem hiding this comment.
Right... I forgot this 🤔 I think we should potentially split our integrations to account for this, because otherwise we're going to be doing expensive work which we know is going to fail when we try to describe the cluster.
There's two ways to do that, depending on how reliably we can detect what version we're in from the KafkaHelper:
- Extract the common code out and create a duplicate of the integration
- One integration targets 1.x.x only, and does the existing stuff
- The other integration targets 2.x.x+, and includes the describe cluster work
- Pro-actively try to grab the
DescribeClusterOptionstype - if it doesn't exist, there's no point trying to build and duck type anAdminClient, so we can (and should) just bail at that point. What's more, we know that can never work, so we should probably cache anull/""value for the bootstrap servers (and potentially bypass that cache entirely - just always returnnullif that lookup of theTypehas failed...)
There was a problem hiding this comment.
I like the second approach in terms of simplicity, I'll do that. Flagging an early null/"" result in that case seems like a good thing to do
There was a problem hiding this comment.
(My original comment was wrong and the version requirement is actually > 2.3.0)
zacharycmontoya
left a comment
There was a problem hiding this comment.
I've added a couple of comments but I think Andrew already did a very comprehensive review. Looking forward to changes based on PR feedback, but in general I think this is looking pretty promising
- Use inline ternary for backlog tags to avoid extra string allocations - Replace string.IsNullOrEmpty with StringUtil.IsNullOrEmpty throughout - Simplify GetClusterId with direct DuckCast and using statement - Accept nullable clusterId in ConsumerCache.SetConsumerGroup - Use typed Config class instead of string[] in consumer constructor - Add nullability annotations to duck typing interfaces Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…duck typing Anchor reflection type lookups to the Confluent.Kafka assembly obtained from the actual client instance, avoiding fragile global assembly searches. Replace raw reflection property access on DescribeClusterOptions with a duck type. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
79d465e to
3962123
Compare
…duck typing Anchor reflection type lookups to the Confluent.Kafka assembly obtained from the actual client instance, avoiding fragile global assembly searches. Replace raw reflection property access on DescribeClusterOptions with a duck type. Early-return and cache empty result for Confluent.Kafka < 2.3.0 where DescribeClusterAsync is unavailable. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
3962123 to
c6531c5
Compare
andrewlock
left a comment
There was a problem hiding this comment.
LGTM overall - just a couple of final tweaks around logging and caching, that should probably be addressed, but otherwise looking good!
zacharycmontoya
left a comment
There was a problem hiding this comment.
Aside from the small comments, LGTM!
Summary of changes
kafka_cluster_id. Without this, metrics become incorrect when there are the same topics across different clusters (i.e. dev/prod environments).Note: I drafted an alternative approach calling librdkafka directly here. That gives the cluster id without any external API call or version dependency (It's available and unchanged from librdkafka 1.0.0), but calls into unmanaged code
Reason for change
This functionality exists in other tracers:
Java (doesn't block, intercepts the metadata response to enrich cluster id going forwards, see here and here)
Node (blocks, see here and here)
Python (blocks with a 1 second timeout, see here and here)
Implementation details
Constructs the Kafka Admin API client and queries for the cluster id directly on consumer/producer startup.
We cache this by bootstrap servers, so we expect to make this API call once.
Test coverage
Other details
These DSM metrics will now tag by kafka_cluster_id


Added to spans (I checked this on produce too, I just don't have a screenshot of it)