Separated Metrics Handling for Throughput Violating Topics#930
Conversation
| .getMetadata() | ||
| .put(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY); | ||
| LOG.info( | ||
| "Feature handling throughput violations disabled. Flushed throughput violating topics for datastream {}", |
There was a problem hiding this comment.
what does Flushed mean here?
There was a problem hiding this comment.
Discarded.
There was a problem hiding this comment.
Will it make more sense to use Discarded instead of Flushed
| String testCluster = "testThroughputViolatingTopicsHandlingForSingleDatastream"; | ||
| String connectorType = "connectorType"; | ||
| String streamName = "testThroughputViolatingTopicsHandlingForSingleDatastream"; | ||
| String numThroughputViolatingTopicsMetric = |
There was a problem hiding this comment.
You should reference the actual metric rather than redefining it.
Is this just for testing purposes?
There was a problem hiding this comment.
yes only for testing purposes. I went this route since other tests have similar behavior as the metrics subclass is private. But instead I created a visible for testing function to retrieve the metric name.
| _throughputViolatingTopicsMap.put(datastream.getName(), new HashSet<>(Arrays.asList(violatingTopics))); | ||
| } | ||
| .getOrDefault(DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS, StringUtils.EMPTY); | ||
| String[] violatingTopics = Arrays.stream(commaSeparatedViolatingTopics.split(",")) |
There was a problem hiding this comment.
What if the parsing is incorrect, the message got trimmed, or is malformed?
Will it make sense to have a clean try-catch block?
There was a problem hiding this comment.
The function populateThroughputViolatingTopicsMap is already sitting within a try catch.
Also, I added another UT to test malformed metadata scenario, so we shouldn't need a newer try catch I think.
| _metricInfos.add(new BrooklinGaugeInfo(_coordinator.buildMetricName(MODULE, metricName))); | ||
| } | ||
|
|
||
| // registers a new gauge or updates the supplier for the gauge if it already exists |
There was a problem hiding this comment.
This looks like a generic call. Why is this required? We have other instances of `Gauge and do not have this.
Based on the reasoning, we will have to do it for all the Gauge instances and move this method to generic location.
There was a problem hiding this comment.
I added this new method since I wanted to report the change in the number of violations per datastream, hence needed a gauge for dynamic keys (datastream names).
The register gauge registers a new gauge for a new key but for any existing key, it returns the already registered gauge and hence this function will update the supplier function for an already registered gauge metric.
|
|
||
| // registers a new gauge or updates the supplier for the gauge if it already exists | ||
| private <T> void registerOrSetGauge(String metricName, Supplier<T> valueSupplier) { | ||
| _dynamicMetricsManager.setGauge(_dynamicMetricsManager.registerGauge(MODULE, metricName, valueSupplier), |
There was a problem hiding this comment.
Will this metric get emitted for zero value as well?
This can be a concern if enabled for the Change capture cluster with many datastreams. It will make sense to have aggregate level metrics and selectively enable datastream-level metrics because X datastream means X Gauge.
There was a problem hiding this comment.
This will be only be emitted wherever the feature of handling bad actors is enabled.
Also I updated the logic to only handle and register/update a gauge when this metadata field exists for that datastream.
| reportEventLatencyMetrics(metadata, eventsSourceTimestamp, THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING); | ||
| _dynamicMetricsManager.createOrUpdateCounter(MODULE, AGGREGATE, TOTAL_EVENTS_PRODUCED, 1); | ||
| _dynamicMetricsManager.createOrUpdateCounter(MODULE, _datastreamTask.getConnectorType(), TOTAL_EVENTS_PRODUCED, |
There was a problem hiding this comment.
This requires more precise documentation explaining X events from the throughput violation topic and Y events from the regular topic and total records X+Y.
| static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; | ||
| static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; | ||
| static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; | ||
| static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs"; |
There was a problem hiding this comment.
What about the other metrics from lines 67-70? Does the reporting need to be split for that as well?
private static final String EVENTS_PRODUCED_WITHIN_SLA = "eventsProducedWithinSla";
private static final String EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA = "eventsProducedWithinAlternateSla";
There was a problem hiding this comment.
For these latency violations, we can skip this metric EVENTS_PRODUCED_WITHIN_SLA, but I updated the code to emit EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA metric as those would be applicable.
|
|
||
| static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs"; | ||
| static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs"; | ||
| static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs"; |
There was a problem hiding this comment.
Do you need additional metric emission checks?
…handling updates based on the comments to the PR
The EventProducer of every DatastreamTask reports SLA and latency metrics for every datastream record. But when topics (at least one partition) have higher throughput than the brooklin permissible thresholds, it introduces latency and SLA misses in the mirroring pipeline.
This pull request is the second part of changes to handle the metrics and reporting of throughput-violating topics separately. It introduces the following changes:
Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API (Part 1 of this work) is merged and can be referenced here.