Skip to content

[Bug] Cannot determine whether the message is a duplicate at this time #21892

@graysonzeng

Description

@graysonzeng

Search before asking

  • I searched in the issues and found nothing similar.

Version

pulsar version:3.1.1,master

Minimal reproduce step

broker count: 2
bookie count: 5

broker config:
managedLedgerDefaultAckQuorum: "2"
managedLedgerDefaultEnsembleSize: "4"
managedLedgerDefaultWriteQuorum: "3"

// Open Deduplication config
brokerDeduplicationEnabled: "true"

// enable Interceptor
brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Enable batch producer by default

Using pulsar perf the publishing rate is 200000 messages/sec and the total number of messages is 100000000.
Consume it at the same time.

bin/pulsar-perf produce persistent://pulsar/default/input_test -r 200000 -m 10000000

At the same time, Use a function to consume and produce messages, and set the sequenceId to the producer in the function.(Use EFFECTIVELY_ONCE mode)

What did you expect to see?

Complete the production and consumption of all messages

What did you see instead?

the producer fall into the following error and be stuck because of this error until the broker is restarted.

2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-3] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Re-Sending 1142 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 1 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-9] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 4340 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 395 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 1482 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 10 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 454 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 - R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [21.24.16.52/21.24.16.52:6650] Got exception io.netty.channel.StacklessClosedChannelException
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Disconnected
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-8] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-6-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s

Anything else?

After stuck, the heap dump of the broker was generated and something unusual was discovered

image

the pendingAddOps of LedgerHandle is also retains a lot of requests, the first request status in the queue is not completed, and pendingWriteRequests = 0, and addEntrySuccessBookies is empty.

image

But the second request is completed status.

if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {

In isDuplicate of MessageDeduplication, the sequenceId is between lastSequenceIdPersisted and highestSequencedPushed, this is the reason why we receive Cannot determine whether the message is a duplicate at this time error

                if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
                    return MessageDupStatus.Dup;
                } else {
                    return MessageDupStatus.Unknown;
                }

The client received this error, then disconnected and resent the message. The resent message was still at sequenceId > lastSequenceIdPersisted, causing it to fall into a loop.

Update

An important log message was found

14:29:19.961 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while running task: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
	at io.netty.buffer.AbstractByteBuf.checkIndexBounds(AbstractByteBuf.java:112) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:144) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:56) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:42) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.UnpooledDuplicatedByteBuf.<init>(UnpooledDuplicatedByteBuf.java:24) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedDuplicateByteBuf.<init>(AbstractPooledDerivedByteBuf.java:164) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf.duplicate0(AbstractPooledDerivedByteBuf.java:157) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.PooledSlicedByteBuf.duplicate(PooledSlicedByteBuf.java:118) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf$Component.duplicate(CompositeByteBuf.java:1947) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf.component(CompositeByteBuf.java:1556) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSendingV2(DigestManager.java:149) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSending(DigestManager.java:106) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.PendingAddOp.initiate(PendingAddOp.java:246) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.doAsyncAddEntry(LedgerHandle.java:1363) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.asyncAddEntry(LedgerHandle.java:1061) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:144) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:862) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:794) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

It points to bookkeeper DigestManager.computeDigestAndPackageForSendingV2()

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L149

private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length,
                                                                 ByteBuf data, byte[] masterKey, int flags) {

        if (unwrapped instanceof CompositeByteBuf) {
            CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
            for (int i = 0; i < cbb.numComponents(); i++) {
                // throws a IndexOutOfBoundsException
                ByteBuf b = cbb.component(i);
                digest = update(digest, b, b.readerIndex(), b.readableBytes());
            }
        } else {
            digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
        }

    }

In normal circumstances, after calculation, the result will be assigned to toSend and the payload will be changed to null.

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L231

public synchronized void initiate() {
       ......
        this.toSend = lh.macManager.computeDigestAndPackageForSending(
                entryId, lh.lastAddConfirmed, currentLedgerLength,
                payload, lh.ledgerKey, flags);
        // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
        payload = null;
       ......
    }

Therefore, we can see the peek of pendingAddOps still retains the payload, and toSend is empty

image

In bookkeeper PendingAddOp.unsetSuccessAndSendWriteRequest(), if toSend is null, it is return directly, So this request has been retained in pendingAddOps since computeDigest failed
https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L183

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions