Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

Motivation

The cherry-picking on a399b5c inserts the topic future cache duplicatedly.

1st time:

return topics.computeIfAbsent(topicName.toString(), (tpName) ->

or

return topics.computeIfAbsent(topicName.toString(), (tpName) ->

2nd time:

final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> {

In this case, topic future might fail with timeout. This error was exposed after dd149d4 because the future completed with LowOverheadTimeoutException won't trigger the cache invalidation.

Modifications

Move the fetchPartitionedTopicMetadataAsync check to loadOrCreatePersistentTopic.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@BewareMyPower BewareMyPower self-assigned this Nov 5, 2025
@BewareMyPower BewareMyPower requested a review from lhotari November 5, 2025 11:36
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 5, 2025
@lhotari lhotari changed the title [branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts [fix][broker][branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts Nov 5, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari
Copy link
Member

lhotari commented Nov 5, 2025

/pulsarbot rerun-failure-checks

@lhotari
Copy link
Member

lhotari commented Nov 5, 2025

In this case, topic future might fail with timeout. This error was exposed after dd149d4 because the future completed with LowOverheadTimeoutException won't trigger the cache invalidation.

Just wondering if there should be code to handle this. Similar as in createNonPersistentTopic

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});

@BewareMyPower
Copy link
Contributor Author

Well, there has been a debate before in https://github.com/apache/pulsar/pull/24500/files#r2199584548

The timeout failure usually represents a unknown state. Removing it from the cache cannot cancel the ongoing topic loading process in background. There could be two concurrent topic loading tasks if we added the logic, some tests might fail (you can see the changes in #24500). We have to take care of this case.

Topic loading for non-persistent topics is much more simple than persistent topics, so actually the timeout exception hardly happen.

@lhotari
Copy link
Member

lhotari commented Nov 5, 2025

Well, there has been a debate before in https://github.com/apache/pulsar/pull/24500/files#r2199584548

The timeout failure usually represents a unknown state. Removing it from the cache cannot cancel the ongoing topic loading process in background. There could be two concurrent topic loading tasks if we added the logic, some tests might fail (you can see the changes in #24500). We have to take care of this case.

Topic loading for non-persistent topics is much more simple than persistent topics, so actually the timeout exception hardly happen.

@BewareMyPower Yes, I agree. I came to the same conclusion when looking into the code. However, there's also problems in the current CompletableFuture & timeout based solution. For example, if the topic has already opened the ledger when the timeout occurs, it won't be able to complete the future successfully. The topic needs to be closed after it just got successfully opened:

if (!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent topic timeout.
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing"
+ " the topic", topic, FutureUtil.getException(topicFuture));
} else {
// It should not happen.
log.error("{} future is already completed by another thread, "
+ "which is not expected. Closing the current one", topic);
}
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
}
});
});

This doesn't make much sense. Instead of using CompletableFuture values in maps, a better approach would be to have some Entry class that holds more related state. There could be a CompletableFuture inside the Entry class.
In the topic loading timeout scenario, it doesn't make sense to terminate the topic loading if it's already opening the ledger.
The system would just get more loaded due to closing the topic again and reopening it in a retry.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Nov 5, 2025

IMO, we should not apply a timeout on topic loading. When the timeout happens, there must be a bug or the CPU resource is exhausted. You can see topicLoadTimeoutSeconds is 60, which is already too long. Just imagine creating a producer and the future does not complete in 60 seconds, or a reconnection due to topic unloading didn't succeed in 60 seconds.

In production env, the timeout is usually caused by topic replay because the metadata store operation should not be such slow:

  1. __change_events
  2. or message deduplication replay on the topic

The 1st case is usually caused by a bug with system topic based topic policies service. It is hard to say whether retrying helps.

The 2nd case is usually caused by the snapshot was not taken in time. Retry the topic replay never helps for the 2nd case. That's also the motivation of #23004, which introduces testFinishTakeSnapshotWhenTopicLoading.

I've been working on this part before but I didn't have time recently. Here are some local notes I wrote before. https://gist.github.com/BewareMyPower/c7f26eb2b5741c94d8cf771c4dc35a97

@BewareMyPower BewareMyPower merged commit 6e3d5d8 into branch-4.0 Nov 5, 2025
120 of 128 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-4.0 branch November 5, 2025 13:31
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…oading due to topic future cache conflicts (apache#24947)

(cherry picked from commit 6e3d5d8)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…oading due to topic future cache conflicts (apache#24947)

(cherry picked from commit 6e3d5d8)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs release/4.0.9

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants