Skip to content

[Bug] Broker won't immediately cache produced entries when all consumers leave and the first consumer joins #24656

@lhotari

Description

@lhotari

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

All released Pulsar versions

Issue Description

In Pulsar Broker's Managed Ledger, there's a concept of an "active" cursor and an "inactive" cursor. The broker will only cache produced entries when there's at least one "active" cursor.

// Don't insert to the entry cache for the ShadowManagedLedger
if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
}

When a new cursor gets created by a new subscription, it is make "active" by default.

A cursor can get inactivated when it has "fallen behind":

private void checkBackloggedCursor(PersistentSubscription subscription) {
// activate caught up cursor which include consumers
if (!subscription.getConsumers().isEmpty()
&& subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
subscription.getCursor().setActive();
} else {
subscription.getCursor().setInactive();
}
}

When the last consumer of a subscription leaves, it will deactivate the cursor:

@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
// preserve accumulative stats form removed consumer
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();

When a consumer rejoins, the won't get activated immediately. It won't happen until the checkBackloggedCursor method is called as part of statistic update job which happens once per minute:

// this task: helps to activate inactive-backlog-cursors which have caught up and
// connected, also deactivate active-backlog-cursors which has backlog
topic.checkBackloggedCursors();

Error messages


Reproducing the issue

The problem is explained in the description.
It should be possible to reproduce by creating a topic and a subscription, connecting a consumer and having an active producer producing to the topic. At this time, the broker cache hit rate should be very close to 100%.
After disconnecting the consumer and reconnecting, the broker cache won't be used until the scheduled job has run. This will take up to 60 seconds.

Additional information

I already have a fix as part of PR #24623. I have created this issue to describe the problem and so that I can submit a separate PR to fix the issue.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions