Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

Fixes #24497

Background about the initialization of PersistentTopic

The initialization includes the chained asynchronous calls of initialize, preCreateSubscriptionForCompactionIfNeeded, checkReplication and checkDeduplicationStatus methods. Calling all of them in BrokerService is unnecessarily complicated. Ideally, they should be wrapped in a single initialize() method for BrokerService to call.

The recovery of transaction buffer (TopicTransactionBuffer) should belong to the part of initialization as well. But it adopts an asynchronous way that if it failed to recover, it would close the PersistentTopic. This makes code complicated, which requires a more complicated fix (#23884) for the bug that could never have a chance to happen if the PersistentTopic cannot be accessed before the transaction buffer recovery is done.

Motivation

If a PersistentTopic is stuck at initialization due to some bugs, the topic cache (BrokerService#topics) will never remove it. There is a topic initialization timeout configured by topicLoadTimeoutSeconds (60 by default). However, if the topic loading does not finish within the timeout, it won't be removed from the cache.

The dirty topic cache issue has been standing for long time, fixes like #22860 and #23184 only aim at calling topics.remove(topic, topicFuture) in many code paths. But there are still some cases not considered, for example, in #24497, we found two topics were not available until unloading them.

Modifications

This PR adopts a simple and effective solution by removing the topic from the cache when the topic loading times out. The problem is that there could be two pending topic initializations. However, the old PersistentTopic never has a chance to be accessed because it does not exist in the topic cache.

If the topic loading already times out when a managed ledger is opened, just skip creating the PersistentTopic. Then next time the same topic is loaded, the managed ledger will already exist in the cache.

In addition, to address the possible stuck issue in MessageDeduplication, this PR:

  • Adopts a future-based managed ledger util methods instead of callback-based managed ledger APIs.
  • When the topic times out, failing the future returned by MessageDeduplication#checkStatus
  • To keep the behavior of [fix][broker] Fix MessageDeduplication replay timeout cause topic loading stuck #23004 not changed, when the case above happens, take snapshot for the latest replayed snapshot
  • To avoid being affected by the closing of outdated topic, skip closing the managed ledger in PersistentTopic#close

The following tests are added to OrphanPersistentTopicTest

  • testDeduplicationReplayStuck: inject high latency for asyncReadEntries, after the topic loading timeout, a new managed cursor will be created for replay, which does not have high read latency.
  • testOrphanManagedLedgerRemovedAfterUnload: since an orphan managed ledger will be created for loading timeout, verify it will be removed if the topic is unloaded.
  • testDuplicatedTopics: verify closing the old PersistentTopic won't close the managed ledger so that the current PersistentTopic can still work

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#48

@BewareMyPower BewareMyPower self-assigned this Jul 10, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 10, 2025

topicFuture.exceptionally(t -> {
topicFuture.exceptionallyAsync(e -> {
pulsarStats.recordTopicLoadFailed();
Copy link
Contributor

@poorbarcode poorbarcode Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary change of #22860 is to remove the failed Future case by case instead of removing a failed topic creation future here.

@shibd @codelipenghui Could you also review the current PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I explained all details in the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key point:

But there are still some cases not considered

#24497 is the best example. #23184 is another example before.

The main concern is that if an orphan pending PersistentTopic could affect the new pending PersistentTopic, mainly due to:

  • Managed ledger, which was handled in this PR
  • Replicators, especially the Pulsar producer

Even if it could make the new PersistentTopic fail to load, it will be the same with the existing case, with better exception message (e.g. replicator create failure) rather than the misleading timeout exception (Failed to load topic within timeout).

}
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can checkDeduplicationStatus first before checkReplication to avoid creating replicator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Topic cannot be accessed for "Failed to load topic within timeout" unless unloading the namespace

3 participants