Skip to content

tests for min/max partitions per task metrics and minor code quality improvements#887

Merged
vmaheshw merged 7 commits intolinkedin:masterfrom
ryannedolan:master
Apr 12, 2022
Merged

tests for min/max partitions per task metrics and minor code quality improvements#887
vmaheshw merged 7 commits intolinkedin:masterfrom
ryannedolan:master

Conversation

@ryannedolan
Copy link
Copy Markdown
Collaborator

@ryannedolan ryannedolan commented Feb 26, 2022

Fixed bug where min/max partitions per task was being reported incorrectly. Previously, the min/max were the same across all datastreams.

New setGauge method in DynamicMetricsManager enables explicitly setting a gauge value. This reduces some lifecycle management complexity in LoadBalancedPartitionAssigner.

Incidental code quality improvements.

Tests added to verify affected metrics (min/maxPartitionsPerTask).

Fixed bug related to registration of Gauges and Timers (they weren't actually registered before).

Dropped (accidentally?) exported class that was holding min/max stats per task.

@vmaheshw
Copy link
Copy Markdown
Collaborator

vmaheshw commented Mar 1, 2022

ryanne-test-case.txt

Use the diff to try out. The test case creates 2 datastreams. For ds1: the expectation is minTasks as 2 and maxTasks as 3, for ds2: the expectation is minTasks as 1 and maxTasks as 1.

return newTask;
}
Map<String, Set<DatastreamTask>> newAssignments = currentAssignment.entrySet().stream()
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can replace lambda with method reference

x -> x.getKey() -> Map.Entry::getKey

.collect(Collectors.toSet())));

IntSummaryStatistics stats = newAssignments.values().stream()
.flatMap(x -> x.stream()) // flatten
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can replace x -> x.stream() with Collection::stream

Comment on lines +193 to +196
IntSummaryStatistics stats = newAssignments.values().stream()
.flatMap(x -> x.stream()) // flatten
.filter(x -> x.getTaskPrefix().equals(datastreamGroupName))
.collect(Collectors.summarizingInt(x -> x.getPartitionsV2().size()));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block has simplified the code with the overhead of an extra walk on the entire assignment list. I think it should be fine.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well notice that the previous version has several side effects within a stream pipeline. The surrounding code is similarly problematic, but I have left unchanged for now, as I don't understand the code very well (probably cuz of all the side effects).

Comment on lines 141 to 143
} else if (clazz.equals(Timer.class)) {
return (T) new Timer();
return (T) _metricRegistry.timer(name);
} else {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the usage of Timer.class to be able to comment why Timer class was done in a different way. Do you have any idea?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but both Timer and Gauge were done this way (new T()) instead of actually registering with the underlying MetricRegistry. It seems that any attempt to register a Gauge or Timer wasn't really doing anything prior to this change.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason gauge registrations are illegal here?

import org.slf4j.LoggerFactory;

import com.codahale.metrics.Gauge;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line.

private final int _maxPartitionsAcrossTasks;

public static final PartitionAssignmentStats DEFAULT = new PartitionAssignmentStats(0, 0);
private static class DatastreamMetrics {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose in the future there is a requirement to expose methods for minPartitionsAcrossTasks and maxPartitionsAcrossTasks, will we use the gauge to return it?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. I'd think we'd track that in the outer class and provide getters.

I do see that there is an incidental public here, so one could argue I'm accidentally dropping a public API. Concern?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that was one of the idea of the original code to track those values so that they can be exposed using getters. With the new code, that data structure is dropped.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts? We seem to be stuck on this issue. I was trying to minimize the lifecycle mgmt we are doing here, since this class doesn't really have any hooks into when datastreams are created or deleted etc. I think it's a bad idea to piggyback on the metrics registration/deregistration hooks that we do have.

If we need to track these values and expose via getters, I think we'd need to add more lifecycle hooks. I'm skeptical we'd do that in the interface we're impl'ing here.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're not exposing these outside this class at the moment. I'd suggest to worry about how that is happening if there's really a need to use these metric values outside LoadBasedPartitionAssigner

Assert.assertEquals(statObj.getThroughputRateInKBps(), 5);

assertMetricEquals("LoadBasedPartitionAssigner.ds1.minPartitionsAcrossTasks", 1);
assertMetricEquals("LoadBasedPartitionAssigner.ds1.maxPartitionsAcrossTasks", 1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting issue.

Comment on lines +202 to +206

assertMetricEquals("LoadBasedPartitionAssigner.ds1.minPartitionsAcrossTasks", 2);
assertMetricEquals("LoadBasedPartitionAssigner.ds1.maxPartitionsAcrossTasks", 2);

MetricsTestUtils.verifyMetrics(assigner, DynamicMetricsManager.getInstance());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting issue.

_metricsForDatastream.keySet().forEach(this::unregisterMetricsForDatastream);
}

void unregisterMetricsForDatastream(String datastream) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double check if there is a test to cover this code path of registration and deregistration? My guess is no, otherwise, the test would have caught issue with the older patch. Can you add a test to cover that scenario?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do.

@ryannedolan
Copy link
Copy Markdown
Collaborator Author

I'll squash these commits once outstanding comments are resolved.

Copy link
Copy Markdown
Collaborator

@jzakaryan jzakaryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. I think we can go ahead with the way metrics registrations are handled in LoadBasedPartitionAssigner. I also left a comment about Gauge registrations, but that shouldn't stop you from merging the code.

private final int _maxPartitionsAcrossTasks;

public static final PartitionAssignmentStats DEFAULT = new PartitionAssignmentStats(0, 0);
private static class DatastreamMetrics {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're not exposing these outside this class at the moment. I'd suggest to worry about how that is happening if there's really a need to use these metric values outside LoadBasedPartitionAssigner

Comment on lines 141 to 143
} else if (clazz.equals(Timer.class)) {
return (T) new Timer();
return (T) _metricRegistry.timer(name);
} else {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason gauge registrations are illegal here?

@vmaheshw vmaheshw merged commit a67108b into linkedin:master Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants