-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
All released Pulsar versions
Issue Description
In Pulsar Broker's Managed Ledger, there's a concept of an "active" cursor and an "inactive" cursor. The broker will only cache produced entries when there's at least one "active" cursor.
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
Lines 261 to 269 in a14794a
| // Don't insert to the entry cache for the ShadowManagedLedger | |
| if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors()) { | |
| // Avoid caching entries if no cursor has been created | |
| EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); | |
| // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling | |
| // insert | |
| ml.entryCache.insert(entry); | |
| entry.release(); | |
| } |
When a new cursor gets created by a new subscription, it is make "active" by default.
A cursor can get inactivated when it has "fallen behind":
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 3448 to 3456 in d275bd4
| private void checkBackloggedCursor(PersistentSubscription subscription) { | |
| // activate caught up cursor which include consumers | |
| if (!subscription.getConsumers().isEmpty() | |
| && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) { | |
| subscription.getCursor().setActive(); | |
| } else { | |
| subscription.getCursor().setInactive(); | |
| } | |
| } |
When the last consumer of a subscription leaves, it will deactivate the cursor:
Lines 343 to 356 in d275bd4
| @Override | |
| public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { | |
| cursor.updateLastActive(); | |
| if (dispatcher != null) { | |
| dispatcher.removeConsumer(consumer); | |
| } | |
| // preserve accumulative stats form removed consumer | |
| ConsumerStatsImpl stats = consumer.getStats(); | |
| bytesOutFromRemovedConsumers.add(stats.bytesOutCounter); | |
| msgOutFromRemovedConsumer.add(stats.msgOutCounter); | |
| if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { | |
| deactivateCursor(); |
When a consumer rejoins, the won't get activated immediately. It won't happen until the checkBackloggedCursor method is called as part of statistic update job which happens once per minute:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
Lines 147 to 149 in f5c1ad2
| // this task: helps to activate inactive-backlog-cursors which have caught up and | |
| // connected, also deactivate active-backlog-cursors which has backlog | |
| topic.checkBackloggedCursors(); |
Error messages
Reproducing the issue
The problem is explained in the description.
It should be possible to reproduce by creating a topic and a subscription, connecting a consumer and having an active producer producing to the topic. At this time, the broker cache hit rate should be very close to 100%.
After disconnecting the consumer and reconnecting, the broker cache won't be used until the scheduled job has run. This will take up to 60 seconds.
Additional information
I already have a fix as part of PR #24623. I have created this issue to describe the problem and so that I can submit a separate PR to fix the issue.
Are you willing to submit a PR?
- I'm willing to submit a PR!