[improve][broker] PIP-307: Implement broker and client consumer changes when topic is unloading#21682
Conversation
heesung-sohn
left a comment
There was a problem hiding this comment.
Also, I would like to confirm the following.
When topic.isTransferring() == true, we would like to implement the following consumer logic to make this topic transfer graceful.
- ignore readEntry error (when the ledger is closed)
- stop msg delivery to consumers (close subscription and dispatcher)
- ignore in-flight and completed readEntry (do not send to the consumers)
- ignore ack msg
Could you discuss if we further handle the above cases?
heesung-sohn
left a comment
There was a problem hiding this comment.
Hi, I think we covered 2 and 3 logic so far. Can we confirm if we covered 1 and 4 logics in this list?
- ignore readEntry error (when the ledger is closed)
- stop msg delivery to consumers (close subscription and dispatcher)
- ignore in-flight and completed readEntry (do not send to the consumers)
- ignore ack msg
gaoran10
left a comment
There was a problem hiding this comment.
Great work! Left some comments.
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { | ||
| Consumer consumer = consumerFuture.getNow(null); | ||
| Subscription subscription = consumer.getSubscription(); | ||
| if (subscription.getTopic().isTransferring()) { |
There was a problem hiding this comment.
Do we have a test to cover in transferring acks?
There was a problem hiding this comment.
Not specifically, but the modified test covers ack behavior: https://github.com/apache/pulsar/pull/21682/files#diff-744119c61c9f6a1b786c3966acd1e0f63748985cf68a6a1a15418c8d9900a9a8R543-R547. The test does not succeed unless all messages sent during the transfer surface out on the consumers. Since there are 200 messages involved, at least one of them likely has the acknowledgement initially ignored.
…ersistent/PersistentTopic.java Co-authored-by: Kai Wang <kwang@streamnative.io>
…ersistent/PersistentDispatcherMultipleConsumers.java Co-authored-by: Penghui Li <penghui@apache.org>
|
It looks like this PR broke the test SubscriptionMessageDispatchThrottlingTest.testClosingRateLimiter (#21756). The fix is in #21736 . @dragosvictor @heesung-sn @gaoran10 Please review. |
PIP: PIP-307
Motivation
Topic unloading can be sped up, as described by PIP-307, by forwarding the target broker lookup data to clients (both producers and consumers). PR #21408 added support for the producers, this PR introduces similar functionality for the consumers.
Modifications
When the load balancer reassigns a bundle from an old 'source' broker to a new 'target' broker, the consumers are currently forcefully disconnected. The expectation is that they will issue topic lookup calls to locate the new broker.
This PR forwards this information to the consumers as part of the disconnect workflow, allowing them to connect directly to the target broker and skip the lookups in the process.
Specifically:
assignedBrokerLookupDatato theCloseConsumercommand. The consumer uses this information to attempt to connect to the broker once. If the connection fails, it falls back to topic lookups.Verifying this change
This change added tests and can be verified as follows:
testTransferClientReconnectionWithoutLookupandtestUnloadClientReconnectionWithLookupto cover both producer and consumer behavior, as well as all subscription types. Ran the suite 100 times to make sure it was stable.pulsar-perfin a k8s environment. Asserted that all messages pass through during a load balancing run, while no lookups are performed by the clients.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: (dragosvictor#1)