Skip to content

KAFKA-12675: improve the sticky general assignor scalability and performance#10552

Merged
guozhangwang merged 6 commits into
apache:trunkfrom
showuon:KAFKA-12675
Jun 2, 2021
Merged

KAFKA-12675: improve the sticky general assignor scalability and performance#10552
guozhangwang merged 6 commits into
apache:trunkfrom
showuon:KAFKA-12675

Conversation

@showuon

@showuon showuon commented Apr 17, 2021

Copy link
Copy Markdown
Member

I did code refactor/optimization, keep the same algorithm in this PR.
I've achieved:

  1. Originally, With this setting:
topicCount = 50;
partitionCount = 800;
consumerCount = 800;

We complete in 10 seconds, after my code refactor, the time down to 100~200 ms

  1. With the 1 million partitions setting:
topicCount = 500;
partitionCount = 2000;
consumerCount = 2000;

No OutOfMemory will be thrown anymore. The time will take 4~5 seconds.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon showuon Apr 17, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 1:
We used to have 2 map consumer2AllPotentialPartitions and partition2AllPotentialConsumers. But that would need a lot of memory here, ex: consumer2AllPotentialPartitions will need 2000 map, and each map contains 1M partitions (suppose 1 million partition and 2000 consumers). But actually, we only need to store the topics of each potential partitions/consumers, and mapped with partitionsPerTopic. so I changed to topic2AllPotentialConsumers and consumer2AllPotentialTopics. Save memory and save time.

@showuon showuon Apr 17, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 2:
We used to have an ArrayList of unassignedPartitions, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement.

To refactor it, I used two pointer technique to loop through 2 sorted list: sortedPartitions and sortedToBeRemovedPartitions. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use unassignedPartitions and sortedPartitions as the base list, so make them refer to the same list to save memory when brand-new assignment.

@showuon showuon Apr 17, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 3:
We used to have a sortedPartitionConsumersByGeneration map to store all partitions with all generation/consumer, to compute the currentAssignment and prevAssignment. It takes many memory and slow down the calculation. Improve it by computing the currentAssignment and prevAssignment while looping the subscriptions list (referred to the allSubscriptionsEqual method :)) .

@showuon showuon Apr 17, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 4: To have sortPartitions list, we used to sort all of the partitions. To improve it, I sort all topics first(only 500 topics to sort, compared to the original 1 million partitions to sort), and then add the partitions by looping all sorted topics.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe to only 1 topic for the last consumer

@showuon

showuon commented Apr 17, 2021

Copy link
Copy Markdown
Member Author

@ableegoldman , please help review this PR. Thank you.

twmb added a commit to twmb/franz-go that referenced this pull request Apr 17, 2021
As @showuon pointed out in github.com/apache/kafka/pull/10552,
tracking partitionPotentials (in Java, partition2AllPotentialConsumers)
is a huge waste of memory when we only need to know potential topic
consumers. If we knock that out, we knock out the most expensive
allocation as well as a lot of hot looping.

Also, if the members are consistently heap sorted by the least loaded
member, then assigning parttions gets much faster.

Lastly, we can knock out more allocations by getting rid of partNum.
This does unfortunately slow things down in the complex graph case, but
that only happened in one benchmark. Overall it may be a wash.

name                           old time/op    new time/op    delta
Large-8                          9.32ms ± 1%    4.95ms ± 1%   -46.89%  (p=0.000 n=10+10)
LargeWithExisting-8              15.7ms ± 1%    12.8ms ± 1%   -18.66%  (p=0.000 n=10+10)
LargeImbalanced-8                25.7ms ±27%   144.7ms ±14%  +462.35%  (p=0.000 n=10+10)
LargeWithExistingImbalanced-8    15.7ms ± 1%    12.7ms ± 1%   -19.38%  (p=0.000 n=10+10)
Java/large-8                      2.63s ± 1%     0.40s ± 2%   -84.72%  (p=0.000 n=10+10)
Java/large_imbalance-8            13.4s ± 5%      0.5s ± 3%   -96.62%  (p=0.000 n=9+10)
Java/medium-8                    70.9ms ± 1%    17.9ms ± 1%   -74.73%  (p=0.000 n=10+9)
Java/medium_imbalance-8           216ms ± 1%      22ms ± 1%   -89.78%  (p=0.000 n=10+9)
Java/small-8                     49.3ms ± 0%    14.4ms ± 1%   -70.79%  (p=0.000 n=10+10)
Java/small_imbalance-8            149ms ± 0%      17ms ± 1%   -88.46%  (p=0.000 n=9+10)

name                           old alloc/op   new alloc/op   delta
Large-8                          7.12MB ± 0%    4.43MB ± 0%   -37.73%  (p=0.000 n=10+10)
LargeWithExisting-8              9.60MB ± 0%    6.94MB ± 0%   -27.71%  (p=0.000 n=9+9)
LargeImbalanced-8                17.0MB ± 0%     4.7MB ± 1%   -72.09%  (p=0.000 n=9+10)
LargeWithExistingImbalanced-8    9.60MB ± 0%    7.00MB ± 0%   -27.13%  (p=0.000 n=10+9)
Java/large-8                      531MB ± 0%     441MB ± 0%   -17.09%  (p=0.000 n=10+9)
Java/large_imbalance-8           8.54GB ± 0%    0.50GB ± 0%   -94.10%  (p=0.000 n=10+7)
Java/medium-8                    22.5MB ± 0%    17.1MB ± 0%   -23.90%  (p=0.000 n=10+10)
Java/medium_imbalance-8           223MB ± 0%      33MB ± 0%   -85.08%  (p=0.000 n=10+10)
Java/small-8                     18.8MB ± 0%    13.7MB ± 0%   -27.32%  (p=0.000 n=10+10)
Java/small_imbalance-8            147MB ± 0%      24MB ± 0%   -83.67%  (p=0.000 n=9+10)

name                           old allocs/op  new allocs/op  delta
Large-8                           9.56k ± 0%     9.44k ± 0%    -1.31%  (p=0.000 n=9+10)
LargeWithExisting-8               34.0k ± 0%     34.0k ± 1%    -0.18%  (p=0.002 n=9+10)
LargeImbalanced-8                 9.93k ± 0%     9.82k ± 1%    -1.14%  (p=0.000 n=9+10)
LargeWithExistingImbalanced-8     33.8k ± 0%     33.8k ± 0%      ~     (p=0.183 n=10+10)
Java/large-8                      1.04M ± 0%     1.04M ± 0%      ~     (p=0.968 n=10+9)
Java/large_imbalance-8            1.04M ± 0%     1.04M ± 0%    -0.18%  (p=0.000 n=10+8)
Java/medium-8                     56.1k ± 0%     56.1k ± 0%      ~     (p=0.127 n=10+10)
Java/medium_imbalance-8           56.1k ± 0%     56.1k ± 0%      ~     (p=0.473 n=10+8)
Java/small-8                      44.9k ± 0%     44.9k ± 0%      ~     (p=0.468 n=10+10)
Java/small_imbalance-8            44.9k ± 0%     44.9k ± 0%    -0.11%  (p=0.000 n=8+10)
@showuon

showuon commented Apr 19, 2021

Copy link
Copy Markdown
Member Author

The performance comparison in jenkins for uniform subscription and non-equal subscription with the setting:

topicCount = 500;
partitionCount = 2000;
consumerCount = 2000;
Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 13 sec | Passed
Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 17 sec | Passed
Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 14 sec | Passed

Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.4 sec | Passed
Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.3 sec | Passed
Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.9 sec | Passed

I think after this PR, the performance is acceptable for non-equal subscription cases. We can have incremental improvement in the following stories. Thank you.

@showuon

showuon commented Apr 19, 2021

Copy link
Copy Markdown
Member Author

Failed tests are all flaky and unrelated. Thanks.
The fix to flaky MirrorConnectorsIntegration tests issue is in my another PR: #10547
The root cause of flaky RaftClusterTest tests will be addressed in KAFKA-12677.

    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testAddingWorker
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
    Build / JDK 11 and Scala 2.13 / kafka.server.ControllerMutationQuotaTest.testStrictDeleteTopicsRequest()
    Build / JDK 11 and Scala 2.13 / kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments()
    Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithNotAllowedOverridesForPrincipalPolicy
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()

@showuon

showuon commented Apr 20, 2021

Copy link
Copy Markdown
Member Author

@guozhangwang @ableegoldman , PR is ready for review. Thank you. :)

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor questions is do we want to preserve the exact logic of prepopulateCurrentAssignments or not in this refactoring since I'm not sure if it is the case --- I'm fine if we do not really want to preserve that logic and just want to do it in a more efficient way, just bringing this up for clarification.

for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
for (int i = 0; i < entry.getValue(); ++i)
partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>());
topic2AllPotentialConsumers.put(entry.getKey(), new ArrayList<>());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this right? Wouldn't we put the same empty list for the key N times?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Updated. Thank you.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great!

for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {
TopicPartition partition = partitionIter.next();
if (!partition2AllPotentialConsumers.containsKey(partition)) {
if (!topic2AllPotentialConsumers.containsKey(partition.topic())) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the following comment needs to be updated as well.

ownedPartitions.addAll(memberData.partitions);
} else if (!memberData.generation.isPresent()) {
// current maxGeneration is larger than DEFAULT_GENERATION,
// put all partitions as DEFAULT_GENERATION into provAssignment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: prev.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure if the refactored code has the exactly same logic as the old code now since its branching conditions have largely changed. E.g. do we still detect if a partition is assigned to different consumers in a generation or not?

@showuon showuon May 10, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My refactor is just trying to reach the same currentAssignment and prevAssignment as before. So, if you meant this:

if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) {
    // same partition is assigned to two consumers during the same rebalance.
    // log a warning and skip this record
    log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.",
        partition, memberData.generation);

I think this check is unnecessary since we didn't do anything to it, and we cannot do anything to it, either. That's my thought. Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

@showuon

showuon commented May 10, 2021

Copy link
Copy Markdown
Member Author

I'll have some refine to this PR. Please wait for a while . Thanks.

private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class);

public static final int DEFAULT_GENERATION = -1;
public int maxGeneration = DEFAULT_GENERATION;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the maxGeneration into class scope, so we can re-use it in prepopulateCurrentAssignments.

} else if (isAllSubscriptionsEqual && !(subscription.topics().size() == subscribedTopics.size()
&& subscribedTopics.containsAll(subscription.topics()))) {
return false;
isAllSubscriptionsEqual = false;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we'll run through all the subscriptions since the data consumerToOwnedPartitions will also passed into generalAssign

int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers));
List<TopicPartition> sortedAllPartitions = getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse the getAllTopicPartitions in constrainedAssign

* @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it
* @return the partitions don't assign to any current consumers
*/
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions,

@showuon showuon May 11, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the 2 getUnassignedPartitions (this one and the following one) overloading method together for readability

if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
// If the current member's generation is lower than maxGeneration, put into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get());
} else if (!memberData.generation.isPresent()) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now, we already have currentAssignment from allSubscriptionsEqual method, as well as the maxGeneration data, so we can simplify the logic here for prevAssignment only.

@showuon

showuon commented May 11, 2021

Copy link
Copy Markdown
Member Author

@ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available.
cc @guozhangwang , welcome to take another look.
Thank you!

@showuon

showuon commented May 11, 2021

Copy link
Copy Markdown
Member Author

I saw there are cooperative sticky tests failed. I'll update it, and add more tests into it tomorrow or later. Thanks.

@showuon

showuon commented May 18, 2021

Copy link
Copy Markdown
Member Author

Broken tests are fixed and new tests are added for multiple generation tests for unequal subscription cases. Thanks.

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have further comments here, could @vahidhashemian (author of the sticky algorithm) or @ableegoldman lend me another pair of eyes before we proceed? Thanks.

@ableegoldman

Copy link
Copy Markdown
Member

It's on my list to review in the next couple of weeks, if not sooner. Sorry I have not had time to get to this one yet, but I will 🙂 (and I agree we should also get feedback from @vahidhashemian if he sees this)

@vahidhashemian

Copy link
Copy Markdown
Contributor

Thanks for tagging me @guozhangwang @ableegoldman.
I'll try to review this within the next few days.

@showuon

showuon commented May 21, 2021

Copy link
Copy Markdown
Member Author

Thank you, guys! :)

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this detailed work and improvement. Left some initial comments.


// all partitions that needed to be assigned
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
assignedPartitions = null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this null assignment needed? Don't see the variable used after this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it just tells the GC that this memory can be freed, to avoid OOM. I know in this step, we should already allocated all memories we need, but it's just in case. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, assignedPartitions.clear() would have the same impact, but it'll loop through all the arrayList and nullify them one by one. I think we can either null it, or remove this line. What do you think?

/**
     * Removes all of the elements from this list.  The list will
     * be empty after this call returns.
     */
    public void clear() {
        modCount++;
        final Object[] es = elementData;
        for (int to = size, i = size = 0; i < to; i++)
            es[i] = null;
    }```

@showuon

showuon commented May 24, 2021

Copy link
Copy Markdown
Member Author

@vahidhashemian , thanks for your comments. I've updated. Please take a look again. Thank you.


// all partitions that needed to be assigned
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
assignedPartitions = null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).

Comment on lines +581 to 584
if (subscription.userData() != null) {
// since this is our 2nd time to deserialize memberData, rewind userData is necessary
subscription.userData().rewind();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block didn't exist before, why is it needed now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a bug after constrainedAssign implemented. After constrainedAssign implemented, we'll do allSubscriptionsEqual to decide if we want to use constrainedAssign or generalAssign. In allSubscriptionsEqual, we not only check if subscription equal, but also deserialize the user data. So, if it is deserialized once, the position of userData (ByteBuffer) will be moved to the end of the buffer, so that we have to rewind here.

@showuon

showuon commented May 25, 2021

Copy link
Copy Markdown
Member Author

@vahidhashemian , thanks for the comments. I've updated. Please take a look again. Thank you.

@vahidhashemian

Copy link
Copy Markdown
Contributor

Thanks for addressing my comments @showuon. I tested a couple of unit tests and saw the difference this change makes.
I have no further comment at this time. Given this is a big change I'd wait for @ableegoldman's review before approval. In the meantime, I may test some scenarios for additional validation.

@showuon

showuon commented May 26, 2021

Copy link
Copy Markdown
Member Author

@vahidhashemian , thank you for your review! :)

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a chance to run some tests against this pr and they all went fine. +1 from me. Thanks again for the improvements.

@guozhangwang

Copy link
Copy Markdown
Contributor

The failed tests are irrelevant to this PR, I'm merging to trunk now.

@guozhangwang guozhangwang merged commit 6db51e4 into apache:trunk Jun 2, 2021
@guozhangwang

Copy link
Copy Markdown
Contributor

Thank you @showuon !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants