Skip to content

Conversation

@lhotari
Copy link

@lhotari lhotari commented Apr 5, 2023

Motivation

In Kubernetes upgrades, it's possible to get into a state where a TCP/IP connection seems to be alive on the broker. The connection is orphaned and therefore consuming resources and causing resource conflicts with producers and consumers.
This PR intends to target the issue with consumers that are using the Key shared subscription type with sticky hash ranges. The prevents the issue of "Range conflict with consumer ..." which happens when the previous connection from the client is still active on the broker while the client is reconnecting on a new connection.

Modifications

  • make Dispatch.addConsumer and StickyKeyConsumerSelector.addConsumer asynchronous
  • add connectionLivenessCheckTimeoutMillis setting that defaults to 5000ms
  • when a conflicting consumer is found, first do an active check with Pulsar's Ping command to see if the connection is alive.
  • resume the attempt to add the consumer after the check completes

…umers when TCP/IP connections hang

- make Dispatch.addConsumer and StickyKeyConsumerSelector.addConsumer asynchronous
- add connectionLivenessCheckTimeoutMillis setting that defaults to 5000ms
- when a conflicting consumer is found, first do an active check with Pulsar's Ping command
  to see if the connection is alive.
- resume the attempt to add the consumer after the check completes
@lhotari lhotari changed the title [improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections hang [improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned Apr 5, 2023
@lhotari lhotari force-pushed the lh-broker-dead-connection-detection-ls210 branch from 3db054a to 5413b8f Compare April 5, 2023 13:57
Copy link
Collaborator

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

I left some comments PTAL

return validateKeySharedMeta(consumer).thenRun(() -> {
try {
internalAddConsumer(consumer);
} catch (BrokerServiceException.ConsumerAssignException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we catch also RuntimeException here and in other places (I am afraid of unwanted IllegalArugmentException, ArrayIndexOutOfBound....)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it is necessary since thenRun will catch exceptions and complete the future with the exception in any case.

}

try {
dispatcher.addConsumer(consumer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great ! the code already expected a CP

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it fits well into that.

@lhotari
Copy link
Author

lhotari commented Apr 6, 2023

upstream PR apache#20026 , I'll address the review comments soon

@lhotari
Copy link
Author

lhotari commented Apr 12, 2023

@michaeljmarshall please review and merge

Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Uses the same solution that Lari had in the upstream PR.
@michaeljmarshall
Copy link
Member

There were some test failures. I noticed that the upstream PR had fixes, so I just copied them here.

@michaeljmarshall
Copy link
Member

The broker test is failing because I pushed a cherry picked commit that wasn't passing tests. I just pushed 0c365e2 to fix the test. I triggered a rerun on the test. I'm not sure if CI it will pickup that latest commit or if I'll need to merge 2.10_ds into this branch. We'll see.

@michaeljmarshall
Copy link
Member

The CI logs indicated we're running against the commit without the fix. I merged 2.10_ds into this branch to pick up the commit.

@lhotari lhotari merged commit 167ff8b into datastax:2.10_ds Apr 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants