Skip to content

Conversation

@oneby-wang
Copy link
Contributor

@oneby-wang oneby-wang commented Sep 3, 2025

Motivation

In PatternMultiTopicsConsumerImpl, if watcherFuture is not done, the closeAsync() method will not close TopicListWatcher.

List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
if (watcherFuture.isDone() && !watcherFuture.isCompletedExceptionally()) {
TopicListWatcher watcher = watcherFuture.getNow(null);
// watcher can be null when subscription mode is not persistent
if (watcher != null) {
closeFutures.add(watcher.closeAsync());
}
}

Modifications

Re-write the closeAsync() method to make sure TopicListWatcher is closed. See file changes.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, see PatternMultiTopicsConsumerImplTest.

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions
Copy link

github-actions bot commented Sep 3, 2025

@oneby-wang Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Sep 3, 2025
@oneby-wang oneby-wang force-pushed the patternConsumer_closeAsync branch from 8aa9a41 to 5694f22 Compare September 3, 2025 09:07
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! Please check the review comments.

oneby-wang added 2 commits September 4, 2025 09:45
@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 4, 2025

Hi, @lhotari thanks for your code review and suggestion, I didn't go that far before, I encountered this problem when I contributed to spring-pulsar, see DefaultPulsarConsumerFactoryTests.java.

First, ignore errors or not? I think throwing Exception is very important(for debug reason), unless error messages already printed. So it is safe to ignore the following two errors and continue CompletableFuture progress.

  1. watcherFuture error: already printed.

    if (nonRetriableError) {
    exception.setPreviousExceptionCount(previousExceptionCount);
    if (watcherFuture.completeExceptionally(exception)) {
    setState(State.Failed);
    log.info("[{}] Watcher creation failed for {} with non-retriable error {}",
    topic, name, exception.getMessage());
    deregisterFromClientCnx();
    return false;
    }

    and
    log.warn("[{}][{}] Failed to create topic list watcher on {}",
    topic, getHandlerName(), cnx.channel().remoteAddress());
    if (e.getCause() instanceof PulsarClientException
    && PulsarClientException.isRetriableError(e.getCause())
    && System.currentTimeMillis()
    < CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) {
    future.completeExceptionally(e.getCause());
    } else if (!watcherFuture.isDone()) {
    // unable to create new watcher, fail operation
    setState(State.Failed);
    watcherFuture.completeExceptionally(
    PulsarClientException.wrap(e, String.format("Failed to create topic list watcher %s"
    + "when connecting to the broker", getHandlerName())));

    and
    BaseCommand cmd = Commands.newWatchTopicListClose(watcherId, requestId);
    cnx.newWatchTopicListClose(cmd, requestId).handle((v, exception) -> {
    final ChannelHandlerContext ctx = cnx.ctx();
    boolean ignoreException = ctx == null || !ctx.channel().isActive();
    if (ignoreException && exception != null) {
    log.debug("Exception ignored in closing watcher", exception);
    }
    cleanupAtClose(closeFuture, ignoreException ? null : exception);
    return null;
    });
    }

    and
    private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
    RequestType requestType, boolean flush,
    TimedCompletableFuture<T> future) {
    pendingRequests.put(requestId, future);
    if (flush) {
    ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
    if (!writeFuture.isSuccess()) {
    if (pendingRequests.remove(requestId, future) && !future.isDone()) {
    log.warn("{} Failed to send {} to broker: {}", ctx.channel(),
    requestType.getDescription(), writeFuture.cause().getMessage());
    future.completeExceptionally(writeFuture.cause());
    }
    }
    });
    } else {
    ctx.write(requestMessage, ctx().voidPromise());
    }
    requestTimeoutQueue.add(new RequestTime(requestId, requestType));
    }

  2. runningTaskCancelCloseFuture error: already ignored.

    public synchronized CompletableFuture<Void> cancelAllAndWaitForTheRunningTask() {
    this.closed = true;
    if (taskInProgress == null) {
    return CompletableFuture.completedFuture(null);
    }
    // If the in-progress task is consumer init task, it means nothing is in-progress.
    if (taskInProgress.getLeft().equals(UpdateSubscriptionType.CONSUMER_INIT)) {
    return CompletableFuture.completedFuture(null);
    }
    return taskInProgress.getRight().thenAccept(__ -> {}).exceptionally(ex -> null);
    }

Second, close execution order.
topicListWatcherCloseFuture and runningTaskCancelCloseFuture can be closed concurrently, after they are closed, then close super.closeAsync().

But one thing, add volatile on PatternConsumerUpdateQueue.closed field, because PatternMultiTopicsConsumerImpl.closeAsync() method and PatternConsumerUpdateQueue.triggerNextTask() method may probably invoked in different threads.

TopicListWatcher will append TopicsRemovedOp and TopicsAddedOp to patternConsumerUpdateQueue, which will call PatternConsumerUpdateQueue.triggerNextTask() method.

public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) {
patternConsumerUpdateQueue.appendTopicsRemovedOp(update.getDeletedTopicsList());
patternConsumerUpdateQueue.appendTopicsAddedOp(update.getNewTopicsList());
}

Undoubtedly, super.closeAsync()(close multi-consumers) should be closed after all PatternMultiTopicsConsumerImpl tasks are closed.

As you suggested and my replied content, I applied the code review change and pushed to this pull request.

@oneby-wang
Copy link
Contributor Author

Sorry for my mistake, closed field doesn't need volatile, it is protected by synchronized.
cancelAllAndWaitForTheRunningTask() and triggerNextTask() can be invoked concurrently, they are also protected by synchronized.
Already fixed.

lhotari
lhotari previously approved these changes Sep 4, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 4, 2025

Hi, @lhotari there is a test failure in Pulsar CI / CI - Unit - Pulsar Client (pull_request), I think the problem is TopicListWatcher.connectionFailed() method doesn't have a dealine or maxRetryCount. If PulsarClient connection is closed, watcherFuture will never complete.

public boolean connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
if (nonRetriableError) {
exception.setPreviousExceptionCount(previousExceptionCount);
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Watcher creation failed for {} with non-retriable error {}",
topic, name, exception.getMessage());
deregisterFromClientCnx();
return false;
}
} else {
previousExceptionCount.incrementAndGet();
}
return true;
}

My local test output:

13:28:39.441 [pulsar-client-internal-36-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-6hKvG] [with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub] Closed Topics Consumer
13:28:39.443 [Test worker] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:60422
13:28:39.454 [pulsar-client-io-23-3] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xc4959e73, L:/[127.0.0.1:60463](http://127.0.0.1:60463/) ! R:localhost/[127.0.0.1:60422](http://127.0.0.1:60422/)] Disconnected
13:28:39.462 [pulsar-client-io-23-3] WARN  org.apache.pulsar.client.impl.BinaryProtoLookupService - [persistent://public/default/topic-.*] failed to send lookup request : Disconnected from server at localhost/[127.0.0.1:60422](http://127.0.0.1:60422/)
13:28:39.462 [pulsar-client-io-23-3] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Disconnected from server at localhost/[127.0.0.1:60422](http://127.0.0.1:60422/)
13:28:39.463 [pulsar-client-io-23-3] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Disconnected from server at localhost/[127.0.0.1:60422](http://127.0.0.1:60422/) -- Will try again in 0.1 s
13:28:39.591 [pulsar-timer-43-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Reconnecting after connection was closed
13:28:39.853 [pulsar-client-io-23-6] WARN  org.apache.pulsar.client.impl.ConnectionPool - Failed to open connection to localhost/<unresolved>:60422 : java.nio.channels.ClosedChannelException
13:28:39.854 [pulsar-client-io-23-6] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.nio.channels.ClosedChannelException
13:28:39.854 [pulsar-client-io-23-6] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.nio.channels.ClosedChannelException -- Will try again in 0.183 s
13:28:40.039 [pulsar-timer-43-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/topic-.*] [Watcher(org.apache.pulsar.common.topics.RE2JTopicsPattern@432ed00c)] Reconnecting after connection was closed

I read the source code of ConsumerImpl.connectionFailed() method, and I prepare to use this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); as watcherFuture's deadline. I think it is reasonable, because TopicListWatcher and TopicLookup are kind of same things.

public boolean connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptionCount(previousExceptionCount);
if (subscribeFuture.completeExceptionally(exception)) {
fail(exception);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}",
topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId);
}
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
return false;
} else {
Throwable actError = FutureUtil.unwrapCompletionException(exception);
if (isUnrecoverableError(actError)) {
closeWhenReceivedUnrecoverableError(actError, null);
return false;
}
}
} else {
previousExceptionCount.incrementAndGet();
}
return true;
}

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 4, 2025

Hi, @lhotari and another test failure, I can't see any detailed error message, need more input help. Pulsar CI / CI - System - Pulsar Connectors - Thread (pull_request)

@lhotari
Copy link
Member

lhotari commented Sep 4, 2025

Hi, @lhotari and another test failure, I can't see any detailed error message, need more input help. Pulsar CI / CI - System - Pulsar Connectors - Thread (pull_request)

Pulsar contains also a lot of flaky tests and for PRs you can trigger a retry with /pulsarbot rerun-failure-checks comment on the PR. For full control, I'd recommend ensuring that your PR changes pass in "Personal CI" in your own fork. The solution is explained in https://pulsar.apache.org/contribute/personal-ci/ . You setup GitHub Actions in your own fork and open a PR to your own fork and run tests there.

In this case, it wasn't a test failure, but the build just didn't succeed in downloading the docker image that is built in another step as part of the workflow. A retry with /pulsarbot rerun-failure-checks would address this.

Getting test errors for integration tests / system tests requires downloading the surefire zip file from GitHub Actions build artifacts for the build and inspecting the files. However, it wasn't the case here.

@lhotari
Copy link
Member

lhotari commented Sep 4, 2025

/pulsarbot rerun-failure-checks

@lhotari
Copy link
Member

lhotari commented Sep 4, 2025

@oneby-wang Perhaps you already noticed that this .exceptionally handler doesn't take the closing status into account:

new TopicListWatcher(updateTaskQueue, client, topicsPattern, watcherId,
namespaceName, topicsHash, watcherFuture, () -> recheckTopicsChangeAfterReconnect());
watcherFuture
.exceptionally(ex -> {
log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling"
+ " for new topics", conf.getSubscriptionName(), ex);
this.recheckPatternTimeout = client.timer().newTimeout(
this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
return null;
});

it might also be useful to hold a reference to the TopicListWatcher instance in a field so that it could be closed explicitly.

@lhotari lhotari dismissed their stale review September 4, 2025 08:25

Need some more changes, please check comments

@oneby-wang
Copy link
Contributor Author

Thanks for input, I'll try to fix test failures.

@codecov-commenter
Copy link

codecov-commenter commented Sep 4, 2025

Codecov Report

❌ Patch coverage is 69.23077% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.12%. Comparing base (2e6a8eb) to head (f77b88c).
⚠️ Report is 175 commits behind head on master.

Files with missing lines Patch % Lines
...rg/apache/pulsar/client/impl/TopicListWatcher.java 42.85% 2 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24698      +/-   ##
============================================
- Coverage     74.33%   74.12%   -0.21%     
+ Complexity    33534    33434     -100     
============================================
  Files          1895     1895              
  Lines        147954   147957       +3     
  Branches      17130    17130              
============================================
- Hits         109978   109671     -307     
- Misses        29256    29527     +271     
- Partials       8720     8759      +39     
Flag Coverage Δ
inttests 26.31% <0.00%> (-0.35%) ⬇️
systests 22.64% <46.15%> (-0.11%) ⬇️
unittests 73.64% <69.23%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ar/client/impl/PatternMultiTopicsConsumerImpl.java 75.00% <100.00%> (-0.68%) ⬇️
...rg/apache/pulsar/client/impl/TopicListWatcher.java 69.50% <42.85%> (+3.08%) ⬆️

... and 84 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari requested a review from Copilot September 4, 2025 12:08
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a resource leak issue where TopicListWatcher was not properly closed when PatternMultiTopicsConsumerImpl.closeAsync() was called. The original implementation only closed the watcher if watcherFuture.isDone() was true, which could leave the watcher unclosed in certain scenarios.

  • Modified PatternMultiTopicsConsumerImpl.closeAsync() to ensure TopicListWatcher is always closed regardless of future completion status
  • Added timeout handling to TopicListWatcher constructor and connection failure logic to prevent indefinite waiting
  • Improved error logging to distinguish between non-retriable errors and timeout scenarios

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
PatternMultiTopicsConsumerImpl.java Refactored closeAsync() method to guarantee watcher closure using thenCompose() instead of checking isDone()
TopicListWatcher.java Added timeout tracking with deadline field and enhanced connection failure handling to support timeout scenarios

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@Technoboy- Technoboy- added this to the 4.2.0 milestone Sep 5, 2025
@Technoboy- Technoboy- merged commit cc824e5 into apache:master Sep 5, 2025
51 checks passed
@lhotari
Copy link
Member

lhotari commented Sep 5, 2025

@coderzc @Technoboy- I think that the timeout solution could be a problem and that this PR was merged too early. The Pulsar Client is designed in a way where it should indefinitely keep on retrying until the client gets closed. This PR breaks that principle.

@lhotari
Copy link
Member

lhotari commented Sep 5, 2025

I created issue #24707 that should be addressed.

Technoboy- pushed a commit that referenced this pull request Sep 18, 2025
lhotari pushed a commit that referenced this pull request Sep 18, 2025
…icsConsumerImpl.closeAsync() method (#24698)

(cherry picked from commit cc824e5)
lhotari pushed a commit that referenced this pull request Sep 18, 2025
…icsConsumerImpl.closeAsync() method (#24698)

(cherry picked from commit cc824e5)
lhotari pushed a commit that referenced this pull request Sep 18, 2025
…icsConsumerImpl.closeAsync() method (#24698)

(cherry picked from commit cc824e5)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 18, 2025
…icsConsumerImpl.closeAsync() method (apache#24698)

(cherry picked from commit cc824e5)
(cherry picked from commit ab5ccf4)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 18, 2025
…icsConsumerImpl.closeAsync() method (apache#24698)

(cherry picked from commit cc824e5)
(cherry picked from commit ab5ccf4)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Sep 19, 2025
…icsConsumerImpl.closeAsync() method (apache#24698)

(cherry picked from commit cc824e5)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 19, 2025
…icsConsumerImpl.closeAsync() method (apache#24698)

(cherry picked from commit cc824e5)
(cherry picked from commit 64e2dc2)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 19, 2025
…icsConsumerImpl.closeAsync() method (apache#24698)

(cherry picked from commit cc824e5)
(cherry picked from commit 64e2dc2)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants