-
Notifications
You must be signed in to change notification settings - Fork 6
[improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned #174
Conversation
…umers when TCP/IP connections hang - make Dispatch.addConsumer and StickyKeyConsumerSelector.addConsumer asynchronous - add connectionLivenessCheckTimeoutMillis setting that defaults to 5000ms - when a conflicting consumer is found, first do an active check with Pulsar's Ping command to see if the connection is alive. - resume the attempt to add the consumer after the check completes
3db054a to
5413b8f
Compare
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
I left some comments PTAL
| return validateKeySharedMeta(consumer).thenRun(() -> { | ||
| try { | ||
| internalAddConsumer(consumer); | ||
| } catch (BrokerServiceException.ConsumerAssignException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we catch also RuntimeException here and in other places (I am afraid of unwanted IllegalArugmentException, ArrayIndexOutOfBound....)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it is necessary since thenRun will catch exceptions and complete the future with the exception in any case.
| } | ||
|
|
||
| try { | ||
| dispatcher.addConsumer(consumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great ! the code already expected a CP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it fits well into that.
|
upstream PR apache#20026 , I'll address the review comments soon |
|
@michaeljmarshall please review and merge |
michaeljmarshall
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Uses the same solution that Lari had in the upstream PR.
|
There were some test failures. I noticed that the upstream PR had fixes, so I just copied them here. |
|
The broker test is failing because I pushed a cherry picked commit that wasn't passing tests. I just pushed 0c365e2 to fix the test. I triggered a rerun on the test. I'm not sure if CI it will pickup that latest commit or if I'll need to merge 2.10_ds into this branch. We'll see. |
|
The CI logs indicated we're running against the commit without the fix. I merged |
Motivation
In Kubernetes upgrades, it's possible to get into a state where a TCP/IP connection seems to be alive on the broker. The connection is orphaned and therefore consuming resources and causing resource conflicts with producers and consumers.
This PR intends to target the issue with consumers that are using the Key shared subscription type with sticky hash ranges. The prevents the issue of "Range conflict with consumer ..." which happens when the previous connection from the client is still active on the broker while the client is reconnecting on a new connection.
Modifications
Dispatch.addConsumerandStickyKeyConsumerSelector.addConsumerasynchronous