Skip to content

[Bug] Consumer stops consuming due to inconsistent state between dispatcher and cursor #24697

@dragonls

Description

@dragonls

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

Broker version: 3.0.7

Issue Description

We observed a case where a Pulsar consumer suddenly stops consuming messages (throughput drops to 0) while the system remains stable.

Image

I have caught a memory dump of broker and seems find a state inconsistency between PersistentDispatcherSingleActiveConsumer and NonDurableCursorImpl:

  1. The dispatcher's havePendingRead flag remains true (indicating an ongoing read operation)
  2. However, the cursor shows:
    • pendingReadOps = 0
    • waitingReadOp = null
    • No visible read operations in progress

There is nothing wrong with the consumer/reader, which is just waiting for the broker to send message to the consumer with no incomingMessages.

If I unload the specific topic partition, everything turns back to normal.

Error messages

2025-09-01T16:20:33,644+0800 [broker-topic-workers-OrderedExecutor-5-0] ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while running task: Object has been recycled already.
java.lang.IllegalStateException: Object has been recycled already.
	at io.netty.util.Recycler$DefaultHandle.toAvailable(Recycler.java:284) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.Recycler$LocalPool.release(Recycler.java:337) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.Recycler$DefaultHandle.recycle(Recycler.java:257) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer$ReadEntriesCtx.recycle(PersistentDispatcherSingleActiveConsumer.java:651) ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:165) ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:158) ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.gemini.1.jar:4.16.6.gemini.1]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:113) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.gemini.1.jar:4.16.6.gemini.1]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]

Reproducing the issue

Can not stably reproduce the issue yet, but quite often recently.

Additional information

The topic stats (pulsar-admin topics stats):

{
    "msgRateIn": 2865.830615952374,
    "msgThroughputIn": 1375341.9489821792,
    "msgRateOut": 17195.656915001044,
    "msgThroughputOut": 8252158.974261264,
    "bytesInCounter": 8025929146395,
    "msgInCounter": 16168225916,
    "bytesOutCounter": 71740195736552,
    "msgOutCounter": 141897883743,
    "averageMsgSize": 479.9104110781945,
    "msgChunkPublished": false,
    "storageSize": 106950077339,
    "backlogSize": 11346044423,
    "publishRateLimitedTimes": 0,
    "earliestMsgPublishTimeInBacklogs": 0,
    "offloadedStorageSize": 106395360285,
    "lastOffloadLedgerId": 210635025,
    "lastOffloadSuccessTimeStamp": 1756724140842,
    "lastOffloadFailureTimeStamp": 0,
    "ongoingTxnCount": 0,
    "abortedTxnCount": 0,
    "committedTxnCount": 0,
    "publishers": [...],
    "waitingPublishers": 0,
    "subscriptions": {
        ...,
        "xxx-reader-c45f5b8585": {
            "msgRateOut": 0.0,
            "msgThroughputOut": 0.0,
            "bytesOutCounter": 54528649459,
            "msgOutCounter": 104377143,
            "msgRateRedeliver": 0.0,
            "messageAckRate": 0.0,
            "chunkedMessageRate": 0,
            "msgBacklog": 2207000,
            "backlogSize": 11346039239,
            "earliestMsgPublishTimeInBacklog": 0,
            "msgBacklogNoDelayed": 2207000,
            "blockedSubscriptionOnUnackedMsgs": false,
            "msgDelayed": 0,
            "unackedMessages": 0,
            "type": "Exclusive",
            "activeConsumerName": "cdd1e",
            "msgRateExpired": 0.0,
            "totalMsgExpired": 0,
            "lastExpireTimestamp": 1756724399200,
            "lastConsumedFlowTimestamp": 1756714832707,
            "lastConsumedTimestamp": 1756714833634,
            "lastAckedTimestamp": 1756714833690,
            "lastMarkDeleteAdvancedTimestamp": 1756714833690,
            "consumers": [{
                    "msgRateOut": 0.0,
                    "msgThroughputOut": 0.0,
                    "bytesOutCounter": 54528649459,
                    "msgOutCounter": 104377143,
                    "msgRateRedeliver": 0.0,
                    "messageAckRate": 0.0,
                    "chunkedMessageRate": 0.0,
                    "consumerName": "cdd1e",
                    "availablePermits": 7857,
                    "unackedMessages": 0,
                    "avgMessagesPerEntry": 11,
                    "blockedConsumerOnUnackedMsgs": false,
                    "lastAckedTimestamp": 1756714833690,
                    "lastConsumedTimestamp": 1756714833634,
                    "lastConsumedFlowTimestamp": 1756714832707,
                    "metadata": {},
                    "address": "/xxx:54340",
                    "connectedSince": "2025-09-01T00:00:13.09212201+08:00",
                    "clientVersion": "Pulsar-Java-v3.0.13",
                    "lastAckedTime": "2025-09-01T16:20:33.69+08:00",
                    "lastConsumedTime": "2025-09-01T16:20:33.634+08:00"
                }
            ],
            "isDurable": false,
            "isReplicated": false,
            "allowOutOfOrderDelivery": false,
            "consumersAfterMarkDeletePosition": {},
            "nonContiguousDeletedMessagesRanges": 0,
            "nonContiguousDeletedMessagesRangesSerializedSize": 0,
            "delayedMessageIndexSizeInBytes": 0,
            "subscriptionProperties": {},
            "filterProcessedMsgCount": 0,
            "filterAcceptedMsgCount": 0,
            "filterRejectedMsgCount": 0,
            "filterRescheduledMsgCount": 0,
            "durable": false,
            "replicated": false
        },
        ...
    },
    "replication": {},
    "deduplicationStatus": "Disabled",
    "nonContiguousDeletedMessagesRanges": 256,
    "nonContiguousDeletedMessagesRangesSerializedSize": 5368,
    "delayedMessageIndexSizeInBytes": 0,
    "compaction": {
        "lastCompactionRemovedEventCount": 0,
        "lastCompactionSucceedTimestamp": 0,
        "lastCompactionFailedTimestamp": 0,
        "lastCompactionDurationTimeInMills": 0
    },
    "ownerBroker": "xxx:8080"
}

The topic stats-internal (pulsar-admin topics stats-internal):

{
    "entriesAddedCounter": 1548744096,
    "numberOfEntries": 20398660,
    "totalSize": 106975197845,
    "currentLedgerEntries": 112446,
    "currentLedgerSize": 579840850,
    "lastLedgerCreatedTimestamp": "2025-09-01T18:54:52.871+08:00",
    "waitingCursorsCount": 0,
    "pendingAddEntriesCount": 1,
    "lastConfirmedEntry": "210637258:112444",
    "state": "LedgerOpened",
    "ledgers": [{
            "ledgerId": 210321221,
            "entries": 181771,
            "size": 937919906,
            "offloaded": true,
            "underReplicated": false,
            "bookkeeperDeleted": true
        }, ..., {
            "ledgerId": 210637258,
            "entries": 0,
            "size": 0,
            "offloaded": false,
            "underReplicated": false,
            "bookkeeperDeleted": false
        }
    ],
    "cursors": {
        ...,
        "xxx-reader-c45f5b8585": {
            "markDeletePosition": "210600931:90642",
            "readPosition": "210600931:90643",
            "waitingReadOp": false,
            "pendingReadOps": 0,
            "messagesConsumedCounter": 1546532189,
            "cursorLedger": -1,
            "cursorLedgerLastEntry": -1,
            "individuallyDeletedMessages": "[]",
            "lastLedgerSwitchTimestamp": "2025-09-01T00:00:13.091+08:00",
            "state": "Open",
            "active": false,
            "numberOfEntriesSinceFirstNotAckedMessage": 1,
            "totalNonContiguousDeletedMessagesRange": 0,
            "subscriptionHavePendingRead": true,
            "subscriptionHavePendingReplayRead": false,
            "properties": {}
        },
        ...
    },
    "schemaLedgers": [],
    "compactedLedger": {
        "ledgerId": -1,
        "entries": -1,
        "size": -1,
        "offloaded": false,
        "underReplicated": false,
        "bookkeeperDeleted": false
    }
}

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions