tests for min/max partitions per task metrics and minor code quality improvements#887
tests for min/max partitions per task metrics and minor code quality improvements#887vmaheshw merged 7 commits intolinkedin:masterfrom
Conversation
|
Use the diff to try out. The test case creates 2 datastreams. For ds1: the expectation is minTasks as 2 and maxTasks as 3, for ds2: the expectation is minTasks as 1 and maxTasks as 1. |
datastream-common/src/main/java/com/linkedin/datastream/metrics/DynamicMetricsManager.java
Outdated
Show resolved
Hide resolved
| return newTask; | ||
| } | ||
| Map<String, Set<DatastreamTask>> newAssignments = currentAssignment.entrySet().stream() | ||
| .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream() |
There was a problem hiding this comment.
nit: you can replace lambda with method reference
x -> x.getKey() -> Map.Entry::getKey
| .collect(Collectors.toSet()))); | ||
|
|
||
| IntSummaryStatistics stats = newAssignments.values().stream() | ||
| .flatMap(x -> x.stream()) // flatten |
There was a problem hiding this comment.
nit: you can replace x -> x.stream() with Collection::stream
| IntSummaryStatistics stats = newAssignments.values().stream() | ||
| .flatMap(x -> x.stream()) // flatten | ||
| .filter(x -> x.getTaskPrefix().equals(datastreamGroupName)) | ||
| .collect(Collectors.summarizingInt(x -> x.getPartitionsV2().size())); |
There was a problem hiding this comment.
This code block has simplified the code with the overhead of an extra walk on the entire assignment list. I think it should be fine.
There was a problem hiding this comment.
Well notice that the previous version has several side effects within a stream pipeline. The surrounding code is similarly problematic, but I have left unchanged for now, as I don't understand the code very well (probably cuz of all the side effects).
...ain/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java
Show resolved
Hide resolved
datastream-common/src/main/java/com/linkedin/datastream/metrics/DynamicMetricsManager.java
Outdated
Show resolved
Hide resolved
| } else if (clazz.equals(Timer.class)) { | ||
| return (T) new Timer(); | ||
| return (T) _metricRegistry.timer(name); | ||
| } else { |
There was a problem hiding this comment.
I don't see the usage of Timer.class to be able to comment why Timer class was done in a different way. Do you have any idea?
There was a problem hiding this comment.
no, but both Timer and Gauge were done this way (new T()) instead of actually registering with the underlying MetricRegistry. It seems that any attempt to register a Gauge or Timer wasn't really doing anything prior to this change.
There was a problem hiding this comment.
What's the reason gauge registrations are illegal here?
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import com.codahale.metrics.Gauge; | ||
|
|
| private final int _maxPartitionsAcrossTasks; | ||
|
|
||
| public static final PartitionAssignmentStats DEFAULT = new PartitionAssignmentStats(0, 0); | ||
| private static class DatastreamMetrics { |
There was a problem hiding this comment.
Suppose in the future there is a requirement to expose methods for minPartitionsAcrossTasks and maxPartitionsAcrossTasks, will we use the gauge to return it?
There was a problem hiding this comment.
no. I'd think we'd track that in the outer class and provide getters.
I do see that there is an incidental public here, so one could argue I'm accidentally dropping a public API. Concern?
There was a problem hiding this comment.
I think that was one of the idea of the original code to track those values so that they can be exposed using getters. With the new code, that data structure is dropped.
There was a problem hiding this comment.
Thoughts? We seem to be stuck on this issue. I was trying to minimize the lifecycle mgmt we are doing here, since this class doesn't really have any hooks into when datastreams are created or deleted etc. I think it's a bad idea to piggyback on the metrics registration/deregistration hooks that we do have.
If we need to track these values and expose via getters, I think we'd need to add more lifecycle hooks. I'm skeptical we'd do that in the interface we're impl'ing here.
There was a problem hiding this comment.
Seems like we're not exposing these outside this class at the moment. I'd suggest to worry about how that is happening if there's really a need to use these metric values outside LoadBasedPartitionAssigner
| Assert.assertEquals(statObj.getThroughputRateInKBps(), 5); | ||
|
|
||
| assertMetricEquals("LoadBasedPartitionAssigner.ds1.minPartitionsAcrossTasks", 1); | ||
| assertMetricEquals("LoadBasedPartitionAssigner.ds1.maxPartitionsAcrossTasks", 1); |
|
|
||
| assertMetricEquals("LoadBasedPartitionAssigner.ds1.minPartitionsAcrossTasks", 2); | ||
| assertMetricEquals("LoadBasedPartitionAssigner.ds1.maxPartitionsAcrossTasks", 2); | ||
|
|
||
| MetricsTestUtils.verifyMetrics(assigner, DynamicMetricsManager.getInstance()); |
| _metricsForDatastream.keySet().forEach(this::unregisterMetricsForDatastream); | ||
| } | ||
|
|
||
| void unregisterMetricsForDatastream(String datastream) { |
There was a problem hiding this comment.
Can you double check if there is a test to cover this code path of registration and deregistration? My guess is no, otherwise, the test would have caught issue with the older patch. Can you add a test to cover that scenario?
|
I'll squash these commits once outstanding comments are resolved. |
jzakaryan
left a comment
There was a problem hiding this comment.
lgtm. I think we can go ahead with the way metrics registrations are handled in LoadBasedPartitionAssigner. I also left a comment about Gauge registrations, but that shouldn't stop you from merging the code.
| private final int _maxPartitionsAcrossTasks; | ||
|
|
||
| public static final PartitionAssignmentStats DEFAULT = new PartitionAssignmentStats(0, 0); | ||
| private static class DatastreamMetrics { |
There was a problem hiding this comment.
Seems like we're not exposing these outside this class at the moment. I'd suggest to worry about how that is happening if there's really a need to use these metric values outside LoadBasedPartitionAssigner
| } else if (clazz.equals(Timer.class)) { | ||
| return (T) new Timer(); | ||
| return (T) _metricRegistry.timer(name); | ||
| } else { |
There was a problem hiding this comment.
What's the reason gauge registrations are illegal here?
Fixed bug where min/max partitions per task was being reported incorrectly. Previously, the min/max were the same across all datastreams.
New setGauge method in DynamicMetricsManager enables explicitly setting a gauge value. This reduces some lifecycle management complexity in LoadBalancedPartitionAssigner.
Incidental code quality improvements.
Tests added to verify affected metrics (min/maxPartitionsPerTask).
Fixed bug related to registration of Gauges and Timers (they weren't actually registered before).
Dropped (accidentally?) exported class that was holding min/max stats per task.