-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][client]TopicListWatcher not closed when calling PatternMultiTopicsConsumerImpl.closeAsync() method #24698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client]TopicListWatcher not closed when calling PatternMultiTopicsConsumerImpl.closeAsync() method #24698
Conversation
|
@oneby-wang Please add the following content to your PR description and select a checkbox: |
…icsConsumerImpl.closeAsync() method
8aa9a41 to
5694f22
Compare
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! Please check the review comments.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
…letely serial manner
…y and add volatile on PatternConsumerUpdateQueue.closed field
|
Hi, @lhotari thanks for your code review and suggestion, I didn't go that far before, I encountered this problem when I contributed to spring-pulsar, see DefaultPulsarConsumerFactoryTests.java. First, ignore errors or not? I think throwing Exception is very important(for debug reason), unless error messages already printed. So it is safe to ignore the following two errors and continue
Second, close execution order. But one thing, add pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java Line 76 in 2e6a8eb
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java Lines 280 to 283 in 2e6a8eb
Undoubtedly, As you suggested and my replied content, I applied the code review change and pushed to this pull request. |
|
Sorry for my mistake, closed field doesn't need volatile, it is protected by synchronized. |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Hi, @lhotari there is a test failure in Pulsar CI / CI - Unit - Pulsar Client (pull_request), I think the problem is pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java Lines 92 to 107 in 2e6a8eb
My local test output: I read the source code of pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java Lines 1123 to 1151 in 2e6a8eb
|
|
Hi, @lhotari and another test failure, I can't see any detailed error message, need more input help. Pulsar CI / CI - System - Pulsar Connectors - Thread (pull_request) |
Pulsar contains also a lot of flaky tests and for PRs you can trigger a retry with In this case, it wasn't a test failure, but the build just didn't succeed in downloading the docker image that is built in another step as part of the workflow. A retry with Getting test errors for integration tests / system tests requires downloading the surefire zip file from GitHub Actions build artifacts for the build and inspecting the files. However, it wasn't the case here. |
|
/pulsarbot rerun-failure-checks |
|
@oneby-wang Perhaps you already noticed that this pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java Lines 88 to 97 in c358e71
it might also be useful to hold a reference to the TopicListWatcher instance in a field so that it could be closed explicitly. |
Need some more changes, please check comments
|
Thanks for input, I'll try to fix test failures. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24698 +/- ##
============================================
- Coverage 74.33% 74.12% -0.21%
+ Complexity 33534 33434 -100
============================================
Files 1895 1895
Lines 147954 147957 +3
Branches 17130 17130
============================================
- Hits 109978 109671 -307
- Misses 29256 29527 +271
- Partials 8720 8759 +39
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a resource leak issue where TopicListWatcher was not properly closed when PatternMultiTopicsConsumerImpl.closeAsync() was called. The original implementation only closed the watcher if watcherFuture.isDone() was true, which could leave the watcher unclosed in certain scenarios.
- Modified
PatternMultiTopicsConsumerImpl.closeAsync()to ensureTopicListWatcheris always closed regardless of future completion status - Added timeout handling to
TopicListWatcherconstructor and connection failure logic to prevent indefinite waiting - Improved error logging to distinguish between non-retriable errors and timeout scenarios
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
PatternMultiTopicsConsumerImpl.java |
Refactored closeAsync() method to guarantee watcher closure using thenCompose() instead of checking isDone() |
TopicListWatcher.java |
Added timeout tracking with deadline field and enhanced connection failure handling to support timeout scenarios |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Show resolved
Hide resolved
|
@coderzc @Technoboy- I think that the timeout solution could be a problem and that this PR was merged too early. The Pulsar Client is designed in a way where it should indefinitely keep on retrying until the client gets closed. This PR breaks that principle. |
|
I created issue #24707 that should be addressed. |
…icsConsumerImpl.closeAsync() method (#24698)
…icsConsumerImpl.closeAsync() method (apache#24698) (cherry picked from commit cc824e5) (cherry picked from commit ab5ccf4)
…icsConsumerImpl.closeAsync() method (apache#24698) (cherry picked from commit cc824e5) (cherry picked from commit ab5ccf4)
…icsConsumerImpl.closeAsync() method (apache#24698) (cherry picked from commit cc824e5)
…icsConsumerImpl.closeAsync() method (apache#24698) (cherry picked from commit cc824e5) (cherry picked from commit 64e2dc2)
…icsConsumerImpl.closeAsync() method (apache#24698) (cherry picked from commit cc824e5) (cherry picked from commit 64e2dc2)
…icsConsumerImpl.closeAsync() method (apache#24698)
…icsConsumerImpl.closeAsync() method (apache#24698)
Motivation
In PatternMultiTopicsConsumerImpl, if watcherFuture is not done, the closeAsync() method will not close TopicListWatcher.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Lines 388 to 395 in 2e6a8eb
Modifications
Re-write the closeAsync() method to make sure TopicListWatcher is closed. See file changes.
Verifying this change
This change is already covered by existing tests, see PatternMultiTopicsConsumerImplTest.
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-complete