|
32 | 32 | import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; |
33 | 33 | import org.apache.beam.sdk.io.range.OffsetRange; |
34 | 34 | import org.apache.beam.sdk.metrics.Distribution; |
| 35 | +import org.apache.beam.sdk.metrics.Gauge; |
35 | 36 | import org.apache.beam.sdk.metrics.Metrics; |
36 | 37 | import org.apache.beam.sdk.transforms.DoFn; |
37 | 38 | import org.apache.beam.sdk.transforms.SerializableFunction; |
@@ -222,6 +223,9 @@ private ReadFromKafkaDoFn( |
222 | 223 |
|
223 | 224 | private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize; |
224 | 225 | private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; |
| 226 | + |
| 227 | + private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();; |
| 228 | + |
225 | 229 | @VisibleForTesting final long consumerPollingTimeout; |
226 | 230 | @VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider; |
227 | 231 | @VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider; |
@@ -342,6 +346,13 @@ public double getSize( |
342 | 346 | if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) { |
343 | 347 | return numRecords; |
344 | 348 | } |
| 349 | + if (offsetEstimatorCache != null) { |
| 350 | + for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp : |
| 351 | + offsetEstimatorCache.entrySet()) { |
| 352 | + perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate()); |
| 353 | + } |
| 354 | + } |
| 355 | + |
345 | 356 | return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords); |
346 | 357 | } |
347 | 358 |
|
@@ -394,6 +405,13 @@ public ProcessContinuation processElement( |
394 | 405 | Metrics.distribution( |
395 | 406 | METRIC_NAMESPACE, |
396 | 407 | RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString()); |
| 408 | + for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) { |
| 409 | + Gauge backlog = |
| 410 | + Metrics.gauge( |
| 411 | + METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey()); |
| 412 | + backlog.set(backlogSplit.getValue()); |
| 413 | + } |
| 414 | + |
397 | 415 | // Stop processing current TopicPartition when it's time to stop. |
398 | 416 | if (checkStopReadingFn != null |
399 | 417 | && checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) { |
|
0 commit comments