Skip to content

Conversation

@3pacccccc
Copy link
Contributor

Motivation

Currently, when consumer.receive() is called, it blocks indefinitely even when the consumer is being closed. This can lead to threads hanging indefinitely in applications that need to gracefully shutdown consumers. The issue can be reproduced with:

Thread thread = new Thread(() -> {
    try {
        // sleep 0.1 second to close consumer to ensure consumer.receive() is triggerd
        Thread.sleep(1000);
        consumer.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});
thread.start();
// hangs forever
consumer.receive();

in fact, this could happen in many scenario as long as consumer.receive() called before consumer.close()

so, this PR aims to ensure that consumer.receive() will immediately throw an IllegalStateException when the consumer is being closed, providing clear feedback and preventing thread hangs

Modifications

1.Added termination handling in GrowableArrayBlockingQueue to:

  • Check termination flag in take() and poll() methods

  • Throw IllegalStateException with "Queue is terminated" message when terminated

2.Modified ConsumerImpl.closeConsumerTasks() to terminate the incoming message queue

3.Added tests covering:

  • Regular consumer close during receive()

  • Zero queue size consumer receives a batchMessage and another thread close consuer

Verifying this change

  • Make sure that the change passes the CI checks.
    If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)

  • The public API

  • The schema

  • The default values of configurations

  • The threading model

  • The binary protocol

  • The REST endpoints

  • The admin CLI options

  • The metrics

  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: 3pacccccc#15

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 23, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! left some review comments

@3pacccccc
Copy link
Contributor Author

Good catch! left some review comments

thank you so such for pointing these out, I've pushed a new commit, please take a look again.

@3pacccccc 3pacccccc requested a review from lhotari July 24, 2025 11:28
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work @3pacccccc

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a need to handle the case where more than one thread is waiting.

@3pacccccc
Copy link
Contributor Author

There's a need to handle the case where more than one thread is waiting.

IMO, because of the headLock ensures that there is at most one thread can be waiting on the condition. so, I think the scenario handle the case where more than one thread is waiting. might not be existed.
@lhotari

@lhotari
Copy link
Member

lhotari commented Jul 25, 2025

IMO, because of the headLock ensures that there is at most one thread can be waiting on the condition. so, I think the scenario handle the case where more than one thread is waiting. might not be existed.
@lhotari

isNotEmpty has been created with headLock.newCondition(). When .await() is called, the lock is released. When .signal is called, the thread that was awaiting will acquire the lock again. The difference between .signal and .signalAll is that the first one will only wake up one thread. .signalAll will wake up one thread at a time.
There is a need to handle the multi-thread case.

@3pacccccc
Copy link
Contributor Author

IMO, because of the headLock ensures that there is at most one thread can be waiting on the condition. so, I think the scenario handle the case where more than one thread is waiting. might not be existed.
@lhotari

isNotEmpty has been created with headLock.newCondition(). When .await() is called, the lock is released. When .signal is called, the thread that was awaiting will acquire the lock again. The difference between .signal and .signalAll is that the first one will only wake up one thread. .signalAll will wake up one thread at a time. There is a need to handle the multi-thread case.

@lhotari yes! you're right. Thank you so much for pointing this out, I've pushed a new commit, PTAL

@3pacccccc 3pacccccc requested a review from lhotari July 25, 2025 16:22
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work @3pacccccc

@codecov-commenter
Copy link

codecov-commenter commented Jul 25, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.26%. Comparing base (bbc6224) to head (b4a02da).
⚠️ Report is 1311 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24550      +/-   ##
============================================
+ Coverage     73.57%   74.26%   +0.69%     
+ Complexity    32624    32595      -29     
============================================
  Files          1877     1876       -1     
  Lines        139502   146326    +6824     
  Branches      15299    16781    +1482     
============================================
+ Hits         102638   108673    +6035     
- Misses        28908    29021     +113     
- Partials       7956     8632     +676     
Flag Coverage Δ
inttests 26.70% <50.00%> (+2.11%) ⬆️
systests 23.28% <50.00%> (-1.04%) ⬇️
unittests 73.77% <100.00%> (+0.93%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 79.24% <100.00%> (+1.67%) ⬆️
...n/util/collections/GrowableArrayBlockingQueue.java 97.58% <100.00%> (+2.15%) ⬆️

... and 1111 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari merged commit 6ad57da into apache:master Jul 25, 2025
96 of 98 checks passed
@lhotari lhotari added this to the 4.1.0 milestone Jul 25, 2025
lhotari pushed a commit that referenced this pull request Jul 25, 2025
lhotari pushed a commit that referenced this pull request Jul 25, 2025
lhotari pushed a commit that referenced this pull request Jul 25, 2025
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 28, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 28, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
@3pacccccc 3pacccccc deleted the cutReceiveCall branch July 29, 2025 15:07
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 30, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 31, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants