-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Fix failed topic future not removed before the deduplication recovery is done #24500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix failed topic future not removed before the deduplication recovery is done #24500
Conversation
…ntTopic future is done
|
|
||
| topicFuture.exceptionally(t -> { | ||
| topicFuture.exceptionallyAsync(e -> { | ||
| pulsarStats.recordTopicLoadFailed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primary change of #22860 is to remove the failed Future case by case instead of removing a failed topic creation future here.
@shibd @codelipenghui Could you also review the current PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I explained all details in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key point:
But there are still some cases not considered
#24497 is the best example. #23184 is another example before.
The main concern is that if an orphan pending PersistentTopic could affect the new pending PersistentTopic, mainly due to:
- Managed ledger, which was handled in this PR
- Replicators, especially the Pulsar producer
Even if it could make the new PersistentTopic fail to load, it will be the same with the existing case, with better exception message (e.g. replicator create failure) rather than the misleading timeout exception (Failed to load topic within timeout).
| } | ||
| PersistentTopic persistentTopic = isSystemTopic(topic) | ||
| ? new SystemTopic(topic, ledger, BrokerService.this) | ||
| : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can checkDeduplicationStatus first before checkReplication to avoid creating replicator.
Fixes #24497
Background about the initialization of
PersistentTopicThe initialization includes the chained asynchronous calls of
initialize,preCreateSubscriptionForCompactionIfNeeded,checkReplicationandcheckDeduplicationStatusmethods. Calling all of them inBrokerServiceis unnecessarily complicated. Ideally, they should be wrapped in a singleinitialize()method forBrokerServiceto call.The recovery of transaction buffer (
TopicTransactionBuffer) should belong to the part of initialization as well. But it adopts an asynchronous way that if it failed to recover, it would close thePersistentTopic. This makes code complicated, which requires a more complicated fix (#23884) for the bug that could never have a chance to happen if thePersistentTopiccannot be accessed before the transaction buffer recovery is done.Motivation
If a
PersistentTopicis stuck at initialization due to some bugs, the topic cache (BrokerService#topics) will never remove it. There is a topic initialization timeout configured bytopicLoadTimeoutSeconds(60 by default). However, if the topic loading does not finish within the timeout, it won't be removed from the cache.The dirty topic cache issue has been standing for long time, fixes like #22860 and #23184 only aim at calling
topics.remove(topic, topicFuture)in many code paths. But there are still some cases not considered, for example, in #24497, we found two topics were not available until unloading them.Modifications
This PR adopts a simple and effective solution by removing the topic from the cache when the topic loading times out. The problem is that there could be two pending topic initializations. However, the old
PersistentTopicnever has a chance to be accessed because it does not exist in the topic cache.If the topic loading already times out when a managed ledger is opened, just skip creating the
PersistentTopic. Then next time the same topic is loaded, the managed ledger will already exist in the cache.In addition, to address the possible stuck issue in
MessageDeduplication, this PR:MessageDeduplication#checkStatusPersistentTopic#closeThe following tests are added to
OrphanPersistentTopicTesttestDeduplicationReplayStuck: inject high latency forasyncReadEntries, after the topic loading timeout, a new managed cursor will be created for replay, which does not have high read latency.testOrphanManagedLedgerRemovedAfterUnload: since an orphan managed ledger will be created for loading timeout, verify it will be removed if the topic is unloaded.testDuplicatedTopics: verify closing the oldPersistentTopicwon't close the managed ledger so that the currentPersistentTopiccan still workDocumentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: BewareMyPower#48