Skip to content

Commit fd4368f

Browse files
NaireenNaireen
andauthored
Add Backlog Metrics to Kafka Splittable DoFn Implementation (#31281)
* Set backlog in gauge metric * add backlog metrics to splittable dofn Kafka read implementatino --------- Co-authored-by: Naireen <naireenhussain@google.com>
1 parent bb4c1e6 commit fd4368f

1 file changed

Lines changed: 18 additions & 0 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
3333
import org.apache.beam.sdk.io.range.OffsetRange;
3434
import org.apache.beam.sdk.metrics.Distribution;
35+
import org.apache.beam.sdk.metrics.Gauge;
3536
import org.apache.beam.sdk.metrics.Metrics;
3637
import org.apache.beam.sdk.transforms.DoFn;
3738
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -222,6 +223,9 @@ private ReadFromKafkaDoFn(
222223

223224
private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
224225
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;
226+
227+
private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;
228+
225229
@VisibleForTesting final long consumerPollingTimeout;
226230
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
227231
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@@ -342,6 +346,13 @@ public double getSize(
342346
if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
343347
return numRecords;
344348
}
349+
if (offsetEstimatorCache != null) {
350+
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
351+
offsetEstimatorCache.entrySet()) {
352+
perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate());
353+
}
354+
}
355+
345356
return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
346357
}
347358

@@ -394,6 +405,13 @@ public ProcessContinuation processElement(
394405
Metrics.distribution(
395406
METRIC_NAMESPACE,
396407
RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString());
408+
for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
409+
Gauge backlog =
410+
Metrics.gauge(
411+
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey());
412+
backlog.set(backlogSplit.getValue());
413+
}
414+
397415
// Stop processing current TopicPartition when it's time to stop.
398416
if (checkStopReadingFn != null
399417
&& checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {

0 commit comments

Comments
 (0)