-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][client] Terminate consumer.receive() when consumer is closed #24550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! left some review comments
...mmon/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
Outdated
Show resolved
Hide resolved
...mmon/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
Show resolved
Hide resolved
...mmon/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
thank you so such for pointing these out, I've pushed a new commit, please take a look again. |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, good work @3pacccccc
...mmon/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a need to handle the case where more than one thread is waiting.
IMO, because of the |
|
@lhotari yes! you're right. Thank you so much for pointing this out, I've pushed a new commit, PTAL |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, good work @3pacccccc
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #24550 +/- ##
============================================
+ Coverage 73.57% 74.26% +0.69%
+ Complexity 32624 32595 -29
============================================
Files 1877 1876 -1
Lines 139502 146326 +6824
Branches 15299 16781 +1482
============================================
+ Hits 102638 108673 +6035
- Misses 28908 29021 +113
- Partials 7956 8632 +676
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
apache#24550) (cherry picked from commit 6ad57da)
apache#24550) (cherry picked from commit 6ad57da)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit 4dd5b1e)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit ba9147a)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit 4dd5b1e)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit ba9147a)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit ba9147a)
apache#24550) (cherry picked from commit 6ad57da) (cherry picked from commit 4dd5b1e)
Motivation
Currently, when
consumer.receive()is called, it blocks indefinitely even when the consumer is being closed. This can lead to threads hanging indefinitely in applications that need to gracefully shutdown consumers. The issue can be reproduced with:in fact, this could happen in many scenario as long as
consumer.receive()called beforeconsumer.close()so, this PR aims to ensure that
consumer.receive()will immediately throw anIllegalStateExceptionwhen the consumer is being closed, providing clear feedback and preventing thread hangsModifications
1.Added termination handling in GrowableArrayBlockingQueue to:
Check termination flag in take() and poll() methods
Throw IllegalStateException with "Queue is terminated" message when terminated
2.Modified
ConsumerImpl.closeConsumerTasks()to terminate the incoming message queue3.Added tests covering:
Regular consumer close during receive()
Zero queue size consumer receives a batchMessage and another thread close consuer
Verifying this change
Make sure that the change passes the CI checks.
If the box was checked, please highlight the changes
Dependencies (add or upgrade a dependency)
The public API
The schema
The default values of configurations
The threading model
The binary protocol
The REST endpoints
The admin CLI options
The metrics
Anything that affects deployment
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: 3pacccccc#15