Implemented logic to prevent tasks from having more than specified number of partitions#837
Conversation
vmaheshw
left a comment
There was a problem hiding this comment.
Apart from the review for the code modification, pointed one TODO item and one change in already committed code.
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Outdated
Show resolved
Hide resolved
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Outdated
Show resolved
Hide resolved
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Show resolved
Hide resolved
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Show resolved
Hide resolved
| int findTaskWithRoomForAPartition(List<String> tasks, Map<String, Set<String>> partitionMap, int startIndex, | ||
| int maxPartitionsPerTask) { | ||
| for (int i = 0; i < tasks.size(); i++) { | ||
| int currentIndex = (startIndex + i) % tasks.size(); |
There was a problem hiding this comment.
There is a scope of improvement here, especially if the task count is large and most of them are full. We can improve this piece in future if needed.
There was a problem hiding this comment.
I'm open to suggestions on how to improve it.
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Show resolved
Hide resolved
| LoadBasedPartitionAssigner partitionAssigner = new LoadBasedPartitionAssigner(); | ||
| return partitionAssigner.assignPartitions(clusterThroughputInfo, currentAssignment, | ||
| unassignedPartitions, datastreamPartitions); | ||
| unassignedPartitions, datastreamPartitions, _partitionsPerTask); |
There was a problem hiding this comment.
_partitionsPerTask represents the ideal number of partition distribution per task which is used along with _partitionFullnessFactorPct which represent how much percent to fill. This count is not a hard check, but a desired count. _maxPartitionsPerTask is the ultimate count is enforced. If you enforce based on _partitionsPerTask and numTasks are getting limited by smaller maxTask configuration, the entire pipeline will be stuck till the maxTasks configuration is not corrected. One of the most important purpose of enforcing _maxPartitionsPerTask is to avoid zk node size going > 1MB.
There was a problem hiding this comment.
To add to above point, If _enablePartitionNumBasedTaskCountEstimation is enabled, then we can consider honoring _partitionsPerTask and _partitionFullnessFactorPct along with _maxPartitionsPerTask, which means we will first try to fill in upto _partitionsPerTask if we can, and if we cannot then will stretch upto _maxPartitionsPerTask.
There is a scope of reusing the alert here PARTITIONS_PER_TASK_NEEDS_ADJUSTMENT
There was a problem hiding this comment.
At the moment, I'm simply using _maxPartitionsPerTask. We can discuss usage of partitionsPerTask and partitionFullnessFactorPct offline.
...c/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java
Outdated
Show resolved
Hide resolved
...rver/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
Show resolved
Hide resolved
vmaheshw
left a comment
There was a problem hiding this comment.
One javadoc nit and keeping maxPartitionsPerTask and partitionsPerTask discussion open. But, this PR does not have to be blocked on that and can be incremental fix.
| @@ -40,9 +44,10 @@ public class LoadBasedPartitionAssigner { | |||
| */ | |||
There was a problem hiding this comment.
new param missing in the javadoc
…mber of partitions (linkedin#837)
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md