-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
pulsar version:3.1.1,master
Minimal reproduce step
broker count: 2
bookie count: 5
broker config:
managedLedgerDefaultAckQuorum: "2"
managedLedgerDefaultEnsembleSize: "4"
managedLedgerDefaultWriteQuorum: "3"
// Open Deduplication config
brokerDeduplicationEnabled: "true"
// enable Interceptor
brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Enable batch producer by default
Using pulsar perf the publishing rate is 200000 messages/sec and the total number of messages is 100000000.
Consume it at the same time.
bin/pulsar-perf produce persistent://pulsar/default/input_test -r 200000 -m 10000000
At the same time, Use a function to consume and produce messages, and set the sequenceId to the producer in the function.(Use EFFECTIVELY_ONCE mode)
What did you expect to see?
Complete the production and consumption of all messages
What did you see instead?
the producer fall into the following error and be stuck because of this error until the broker is restarted.
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-3] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Re-Sending 1142 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 1 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-9] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 4340 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 395 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 1482 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 10 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 454 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 - R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [21.24.16.52/21.24.16.52:6650] Got exception io.netty.channel.StacklessClosedChannelException
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Disconnected
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-8] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-6-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 sAnything else?
After stuck, the heap dump of the broker was generated and something unusual was discovered
the pendingAddOps of LedgerHandle is also retains a lot of requests, the first request status in the queue is not completed, and pendingWriteRequests = 0, and addEntrySuccessBookies is empty.
But the second request is completed status.
Line 381 in c834feb
| if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) { |
In isDuplicate of MessageDeduplication, the sequenceId is between lastSequenceIdPersisted and highestSequencedPushed, this is the reason why we receive Cannot determine whether the message is a duplicate at this time error
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
}The client received this error, then disconnected and resent the message. The resent message was still at sequenceId > lastSequenceIdPersisted, causing it to fall into a loop.
Update
An important log message was found
14:29:19.961 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while running task: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
at io.netty.buffer.AbstractByteBuf.checkIndexBounds(AbstractByteBuf.java:112) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:144) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:56) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:42) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.UnpooledDuplicatedByteBuf.<init>(UnpooledDuplicatedByteBuf.java:24) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedDuplicateByteBuf.<init>(AbstractPooledDerivedByteBuf.java:164) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.AbstractPooledDerivedByteBuf.duplicate0(AbstractPooledDerivedByteBuf.java:157) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.PooledSlicedByteBuf.duplicate(PooledSlicedByteBuf.java:118) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.CompositeByteBuf$Component.duplicate(CompositeByteBuf.java:1947) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at io.netty.buffer.CompositeByteBuf.component(CompositeByteBuf.java:1556) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSendingV2(DigestManager.java:149) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSending(DigestManager.java:106) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
at org.apache.bookkeeper.client.PendingAddOp.initiate(PendingAddOp.java:246) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
at org.apache.bookkeeper.client.LedgerHandle.doAsyncAddEntry(LedgerHandle.java:1363) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
at org.apache.bookkeeper.client.LedgerHandle.asyncAddEntry(LedgerHandle.java:1061) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:144) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:862) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:794) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.104.Final.jar:4.1.104.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]It points to bookkeeper DigestManager.computeDigestAndPackageForSendingV2()
private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length,
ByteBuf data, byte[] masterKey, int flags) {
if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
// throws a IndexOutOfBoundsException
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
}In normal circumstances, after calculation, the result will be assigned to toSend and the payload will be changed to null.
public synchronized void initiate() {
......
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
payload, lh.ledgerKey, flags);
// ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
payload = null;
......
}Therefore, we can see the peek of pendingAddOps still retains the payload, and toSend is empty
In bookkeeper PendingAddOp.unsetSuccessAndSendWriteRequest(), if toSend is null, it is return directly, So this request has been retained in pendingAddOps since computeDigest failed
https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L183
Are you willing to submit a PR?
- I'm willing to submit a PR!


