-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
StreamNative Pulsar 4.0.5.2, which is based on 6b5fdbf
Issue Description
This issue happens when we restarted brokers for an upgrade. There were two partitions (10 and 23 of a partitioned topic) owned by a broker stuck at initializing the PersistentTopic. Here are the info from the heap dump:
topic:
- name: "<topic>-partition-23"
messageDeduplication:
snapshotCounter: 21
managedCursor:
entriesReadCount: 42
lastMarkDeleteEntry:
newPosition: "860608:298"
pendingReadOps: 0
readPosition: "861217:3"
managedLedger:
config:
readEntryTimeoutSeconds: 0
executor:
runner: "BookKeeperClientWorker-OrderedExecutor-0-0"
tasksFailed: 1
lastConfirmedEntry: "861217:2"
ledgers:
- key: 860608
value: {"entries": 338}
- key: 861217
value: {"entries": 3}
- key: 861315
value: {"entries": 0}
- name: "<topic>-partition-10"
messageDeduplication:
snapshotCounter: 0
managedCursor:
entriesReadCount: 84
lastMarkDeleteEntry:
newPosition: "860650:231"
pendingReadOps: 1
readPosition: "861214:0"
managedLedger:
currentLedger:
ledgerId: 861295
lastConfirmedEntry: "861214:1"
ledgers:
- key: 860650
value: {"entries": 314}
- key: 861214
value: {"entries": 2}
- key: 861295
value: {"entries": 0}
state: LedgerOpenedThe key is the MessageDeduplication field:
statusisInitializedmanagedCursoris not nullsnapshotCounteris non-zero
From
Lines 292 to 298 in ce102da
| managedCursor = cursor; | |
| recoverSequenceIdsMap().thenRun(() -> { | |
| status = Status.Enabled; | |
| future.complete(null); | |
| log.info("[{}] Enabled deduplication", topic.getName()); | |
| }).exceptionally(ex -> { | |
| status = Status.Failed; |
we can get a conclusion that the future of recoverSequenceIdsMap never completed.
The difference between partition 10 and 23 is:
- Partition 10's cursor read position didn't reach the last confirmed entry, and there was a pending read operation. It seems that a read operation was somehow stuck forever.
- Partition 23's cursor read position exceeds the last confirmed entry, the future should have been completed. There is another suspicious point that
snapshotCounteris 21 whileentriesReadCountis 42.
The most possible reason is that exceptions were thrown in a callback like
Line 178 in ce102da
| public void readEntriesComplete(List<Entry> entries, Object ctx) { |
There was something wrong with our log collection, the key logs of SingleThreadExecutor#run have been lost. But we can check the taskFailed field is 1 from the heap dump.
Error messages
Reproducing the issue
Currently I cannot reproduce it.
Additional information
Currently Pulsar's topic caching mechanism is bad. There is a timeout configured by topicLoadTimeoutSeconds (default: 60). 1 minute is usually long enough in most cases. However, when the timeout happens, the topic cache won't be removed from BrokerService#topics until the pending PersistentTopic initialization completes. See
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
Lines 1849 to 1855 in ce102da
| persistentTopic | |
| .initialize() | |
| .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) | |
| .thenCompose(__ -> persistentTopic.checkReplication()) | |
| .thenCompose(v -> { | |
| // Also check dedup status | |
| return persistentTopic.checkDeduplicationStatus(); |
initialize():
- Create a compaction service and set
topicCompactionServicewith it. - Create subscriptions for all cursors in the managed ledger.
- Get namespace policies for this topic from the metadata store.
- Initialize some fields according to the namespace policies.
- Register itself to the topic policies service.
- Get topic policies and update some fields.
- Remove orphan replication cursors.
preCreateSubscriptionForCompactionIfNeeded(): Create __compaction subscription, which might open the durable cursor.
checkReplication():
- Get allowed clusters from the metadata store.
- Start replicators if necessary.
checkDeduplicationStatus(): Perform the topic replay.
If there is a bug that makes PersistentTopic stuck at initialization, e.g. checkDeduplicationStatus(), the topic will be unavailable until manual intervention.
Are you willing to submit a PR?
- I'm willing to submit a PR!