Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Mar 20, 2025

Motivation

When the user has a thread try to subscribe(create a consumer) to a topic.
But the thread get interrupted before getting a response for the consumer creation.

For example:

  • Thread A - call subscribe() method
  • Thread A - blocking on waiting for the response of the consumer creation
  • The user framework interrupted Thread A for aborting a job.
  • Cannot close the consumer since user will get InterruptedException and in this case user can't get consumer object
  • So, in the end, this consumer leaked.

Modifications

  • Added a variable named interruptedBeforeConsumerCreation. When this variable is true, the consumer needs to be closed.

Verifying this change

  • You can run testInterruptedWhenCreateConsumer to reproduce this issue.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@shibd shibd self-assigned this Mar 20, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 20, 2025
@shibd shibd force-pushed the improve_consumer_close branch from 0c22173 to e7922a3 Compare March 20, 2025 07:35
@codecov-commenter
Copy link

codecov-commenter commented Mar 20, 2025

Codecov Report

Attention: Patch coverage is 88.88889% with 2 lines in your changes missing coverage. Please review.

Project coverage is 74.24%. Comparing base (bbc6224) to head (e7922a3).
Report is 980 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 88.88% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24100      +/-   ##
============================================
+ Coverage     73.57%   74.24%   +0.66%     
+ Complexity    32624    32452     -172     
============================================
  Files          1877     1863      -14     
  Lines        139502   144327    +4825     
  Branches      15299    16458    +1159     
============================================
+ Hits         102638   107150    +4512     
+ Misses        28908    28710     -198     
- Partials       7956     8467     +511     
Flag Coverage Δ
inttests 26.71% <50.00%> (+2.12%) ⬆️
systests 23.19% <50.00%> (-1.14%) ⬇️
unittests 73.74% <88.88%> (+0.90%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 86.87% <88.88%> (-0.07%) ⬇️

... and 1066 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari
Copy link
Member

lhotari commented Mar 20, 2025

  • Cannot close the consumer since user will get InterruptedException and can't get consumer object

@shibd Good work! It would be useful to have a failing test case that reproduces this problem included in this PR. Could you please add it too? It seems that it would be a slight variation of the test case that you have added. The "can't get consumer object" part isn't currently tested so that after fixing the issue the "consumer object can be get" (the test would clarify what this actually means).

@shibd
Copy link
Member Author

shibd commented Mar 20, 2025

The "can't get consumer object" part isn't currently tested so that after fixing the issue the "consumer object can be get" (the test would clarify what this actually means).

In this case, regardless of this fix. the user wont get the consumer object, which is expected.

Because the subscribe throw an exception, that's not the focus. The focus is that we need to close this consumer.

@lhotari
Copy link
Member

lhotari commented Mar 20, 2025

In this case, regardless of this fix. the user wont get the consumer object, which is expected.

Because the subscribe throw an exception, that's not the focus. The focus is that we need to close this consumer.

@shibd My point was that once there's a bug, a good practice would be to add a failing test case which reproduces the issue and clarifies it. In this case "the user wont get the consumer object, which is expected" could be understood in many ways. It might be clear to yourself, but not for code reviewers. That's why "failing test cases" reproducing the exact problem are always desired when it's possible to add one. Obviously for many Pulsar bugs that's really hard. In this case, it most likely isn't hard.

@shibd
Copy link
Member Author

shibd commented Mar 20, 2025

You can run testInterruptedWhenCreateConsumer to reproduce it.

@lhotari
Copy link
Member

lhotari commented Mar 20, 2025

You can run testInterruptedWhenCreateConsumer to reproduce it.

@shibd That's not reproducing this flow what you described in the PR description

For example:

  • Thread A - call subscribe() method
  • Thread A - blocking on waiting for the response of the consumer creation
  • The user framework interrupted Thread A for aborting a job.
  • Cannot close the consumer since user will get InterruptedException and can't get consumer object

The step "Cannot close the consumer since user will get InterruptedException and can't get consumer object" is missing. Adding a separate test case that reproduces the user's flow would be useful as I have explained before. Just simply put it in code in a test case which clarifies what "Cannot close the consumer since user will get InterruptedException and can't get consumer object" means. That's the most simplest way without wasting time in writing comments. :)

@shibd
Copy link
Member Author

shibd commented Mar 20, 2025

I don't want to waste your time. I believe I've already added tests to cover it.

You'll see that I've covered in the tests that the consumer will be removed(closed) after this PR fix.

That's the issue I'm addressing.

In this scenario, not getting the consumer object is expected. Why would we add a test case for it? A method throws an exception, and you want to cover that it returns null?

If you can review the code directly and let me know where improvements are needed, I'd be happy to optimize it.

@lhotari
Copy link
Member

lhotari commented Mar 20, 2025

I don't want to waste your time. I believe I've already added tests to cover it.

That's true, that there's a test, but the test is not from the viewpoint of the user of our software. The flow described in the description isn't performed in the test case. That would be an optimal "failing test case". In many software teams, this is a well known concept. However, in the Pulsar code base, most of our tests are not following usual best practices and don't appear to be good examples of how to write good tests.

You'll see that I've covered in the tests that the consumer will be removed(closed) after this PR fix.

That's not "Cannot close the consumer since user will get InterruptedException and can't get consumer object", what was in the description. I agree that the internal behavior change is covered in the test, but not the original bug that appeared.

In this scenario, not getting the consumer object is expected. Why would we add a test case for it? A method throws an exception, and you want to cover that it returns null?

Simply test the scenario "Cannot close the consumer since user will get InterruptedException and can't get consumer object" and assume that it results in whatever is the expected result. It's not more harder than that.

If you can review the code directly and let me know where improvements are needed, I'd be happy to optimize it.

As long as the mentioned scenario is handled in a separate test case, I'm fine with this PR. I know that it's unusual to knit pick on such details in our PRs, but unless attention is paid to details we won't be able to learn from each other and improve as a whole.

@shibd
Copy link
Member Author

shibd commented Mar 21, 2025

Simply test the scenario "Cannot close the consumer since user will get InterruptedException and can't get consumer object" and assume that it results in whatever is the expected result. It's not more harder than that.

First, I clearly asserted here that when the user calls thread.interrupt , the subscribe method will throw an exception.

Isn't that enough?

https://github.com/apache/pulsar/pull/24100/files#diff-16facc169f1bbd7ab0a86d608b2aaa7ab8a578ff83c1d71d784b3df55d651559R71-R81

@shibd
Copy link
Member Author

shibd commented Mar 21, 2025

hi, @lhotari , I respect all your opinions, but I really didn't understand what you meant.

Could you comment directly on the code about what needs to be changed? Or could you show the test you want directly in the code?

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is somewhat peripheral, but it poses a potential risk. In Java, thread interruptions are often ignored, which can lead to resource leaks. Additionally, we have many similar methods that do not properly handle interruptions.

To ensure correctness, users should be aware that they must handle thread interruptions appropriately.

When using the Pulsar Client/Admin, the users should not call the thread interrupt.

@shibd
Copy link
Member Author

shibd commented Mar 21, 2025

hi, @nodece For operation type of api, such as: ack(), nack(). In my understanding, not handling interruptions is acceptable because, even during an interruption, whether the API operation succeeds or fails, it is acceptable to the user.

However, we cannot consider the consumer leak described in this PR as expected behavior; we should fix it.

@lhotari
Copy link
Member

lhotari commented Mar 21, 2025

hi, @lhotari , I respect all your opinions, but I really didn't understand what you meant.

Could you comment directly on the code about what needs to be changed? Or could you show the test you want directly in the code?

Thanks @shibd.

It is simply about "Cannot close the consumer since user will get InterruptedException and in this case user can't get consumer object". I don't understand the part "and in this case user can't get consumer object" and was expecting that a test case would have clarified it.

Now, reading your comments again, it seems that "user can't get consumer object" is expected behavior and this sentence is just misleading and that's not something to be tested in the first place as you have described.
Rephrasing the sentence in the description would be a way to resolve this in that case. This isn't a blocker.
I'll check what a LLM would suggest for the PR description using the solution described in https://gist.github.com/lhotari/81f533af4b9ad515e02d96e543c4408b. It usually gives a pretty good draft and an LLM can be used to also fix grammar issues in the PR description.

@lhotari
Copy link
Member

lhotari commented Mar 21, 2025

@shibd this is the unedited DeepSeek suggestion: https://gist.github.com/lhotari/c76c08a59c6c17b27425cea04b7fa215

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work @shibd

@lhotari lhotari changed the title [fix][client] Consumer leak when thread.Interrupted is called before subscribed [fix][client] Fix consumer leak when thread is interrupted before subscribe completes Mar 21, 2025
@lhotari lhotari merged commit e51a639 into apache:master Mar 21, 2025
56 checks passed
@lhotari lhotari added this to the 4.1.0 milestone Mar 21, 2025
shibd added a commit that referenced this pull request Mar 21, 2025
shibd added a commit that referenced this pull request Mar 21, 2025
shibd added a commit that referenced this pull request Mar 21, 2025
bahetimansi pushed a commit to bahetimansi/pulsar that referenced this pull request Mar 23, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 6bc4f8f)
nodece pushed a commit to nodece/pulsar that referenced this pull request Mar 27, 2025
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 27, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 6bc4f8f)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 28, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 6bc4f8f)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 28, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 9d8e385)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 29, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 9d8e385)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 31, 2025
…scribe completes (apache#24100)

(cherry picked from commit e51a639)
(cherry picked from commit 9d8e385)
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants