-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned #20026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned #20026
Conversation
d9c6aef to
4203566
Compare
…umers when TCP/IP connections hang - make Dispatch.addConsumer and StickyKeyConsumerSelector.addConsumer asynchronous - add connectionLivenessCheckTimeoutMillis setting that defaults to 5000ms - when a conflicting consumer is found, first do an active check with Pulsar's Ping command to see if the connection is alive. - resume the attempt to add the consumer after the check completes
4203566 to
ef9698e
Compare
michaeljmarshall
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work @lhotari!
| public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException { | ||
| validateKeySharedMeta(consumer); | ||
| public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) { | ||
| return validateKeySharedMeta(consumer).thenRun(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: a minor optimization could assign validateKeySharedMeta to a local variable, and then check if that future is already completed. When it is, we know we had the synchronized lock when the validation was done, which would allow us to skip the secondary call to findConflictingConsumer. I am not sure how expensive that call is, but I assume it has some cost that adds up with many key shared consumers.
Since it is an optimization, I don't think we should hold up this PR for that.
…umers when TCP/IP connections get orphaned (apache#20026)
…umers when TCP/IP connections get orphaned (#174) upstream PR apache#20026
|
The PR #21155 fixes an issue in which the producer sends messages timeout due to the inability to reconnect successfully. The root cause is the client created a new connection to reregister the producer after it assumed the old client was invalidated, but at the same time, the broker assumed the old connection was still validated, so the client got an error "Producer with name 'st-0-5' is already connected to topic". The fix in the PR #21155 tries to start a new heartbeat after the broker receives different connections for the same producer registration. In this fix, the PR #21155 uses a tool method I also send a discuss to do this. |
Motivation
In Kubernetes upgrades, it's possible to get into a state where a TCP/IP connection seems to be alive on the broker. The connection is orphaned and therefore consuming resources and causing resource conflicts with producers and consumers.
This PR intends to target the issue with consumers that are using the Key shared subscription type with sticky hash ranges. The prevents the issue of "Range conflict with consumer ..." which happens when the previous connection from the client is still active on the broker while the client is reconnecting on a new connection.
Modifications
Dispatch.addConsumerandStickyKeyConsumerSelector.addConsumerasynchronousDocumentation
docdoc-requireddoc-not-neededdoc-complete