Skip to content

Conversation

@3pacccccc
Copy link
Owner

@3pacccccc 3pacccccc commented Jul 23, 2025

Motivation

Currently, when consumer.receive() is called, it blocks indefinitely even when the consumer is being closed. This can lead to threads hanging indefinitely in applications that need to gracefully shutdown consumers. The issue can be reproduced with:

Thread thread = new Thread(() -> {
    try {
        // sleep 0.1 second to close consumer to ensure consumer.receive() is triggerd
        Thread.sleep(1000);
        consumer.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});
thread.start();
// hangs forever
consumer.receive();

in fact, this could happen in many scenario as long as consumer.receive() called before consumer.close()

Modifications

1.Added termination handling in GrowableArrayBlockingQueue to:

  • Check termination flag in take() and poll() methods

  • Throw IllegalStateException with "Queue is terminated" message when terminated

2.Modified ConsumerImpl.closeConsumerTasks() to terminate the incoming message queue

3.Added tests covering:

  • Regular consumer close during receive()

  • Zero queue size consumer receives a batchMessage and another thread close consuer

Verifying this change

  • Make sure that the change passes the CI checks.
    If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)

  • The public API

  • The schema

  • The default values of configurations

  • The threading model

  • The binary protocol

  • The REST endpoints

  • The admin CLI options

  • The metrics

  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@3pacccccc 3pacccccc changed the title [improve][client]cut consumer.receive() when consumer get closed [improve][client] Terminate consumer.receive() when consumer is closed Jul 23, 2025
@3pacccccc 3pacccccc closed this Jul 29, 2025
@3pacccccc 3pacccccc deleted the cutReceiveCall branch July 29, 2025 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants