-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- Java Client
- commit id: 7547fab
Issue Description
PersistentAcknowledgmentsGroupingTracker#flush could be called concurrently in different threads: one from the scheduled task according to acknowledgementGroupTimeMicros, the other is from the thread that calls Consumer#acknowledgeAsync.
When calling flush concurrently, the access to pendingIndividualBatchIndexAcks is not thread safe:
Lines 487 to 495 in 415c6fa
| if (!pendingIndividualBatchIndexAcks.isEmpty()) { | |
| Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> iterator = | |
| pendingIndividualBatchIndexAcks.entrySet().iterator(); | |
| while (iterator.hasNext()) { | |
| Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next(); | |
| entriesToAck.add(Triple.of( | |
| entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); | |
| iterator.remove(); |
- Thread 1: Add
ConcurrentBitSetRecyclabletoentriesToAck - Thread 2: Add
ConcurrentBitSetRecyclabletoentriesToAck - Thread 1: Remove the entry that holds that bit set
- Thread 2: Remove the entry that holds that bit set (and will fail)
Then two threads will share the same ConcurrentBitSetRecyclable object and call recycle twice.
Error messages
Reproducing the issue
It's hard to reproduce without injecting delay. Here is the patch that applies on 7547fab
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 5f7957d7f1..f982108c1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -445,6 +445,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
+ log.info("XYZ flush {} individual ack and {} batch index ack", pendingIndividualAcks.size(),
+ pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
// We can send 1 single protobuf command with all individual acks
@@ -492,6 +494,10 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next();
entriesToAck.add(Triple.of(
entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue()));
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ }
iterator.remove();
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index bbf1265489..7c28af5e3e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -419,6 +419,30 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
+ @Test
+ public void testConcurrentFlush() throws Exception {
+ when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
+ final var conf = new ConsumerConfigurationData<byte[]>();
+ conf.setBatchIndexAckEnabled(true);
+ conf.setMaxAcknowledgmentGroupSize(2);
+ conf.setAckReceiptEnabled(false); // disable the lock when flushing
+ conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(1));
+ final var tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+
+ final var batchSize = 3;
+ final var bitSet = new BitSet(batchSize);
+ bitSet.set(0, batchSize);
+ tracker.addAcknowledgment(new BatchMessageIdImpl(0L, 0L, -1, 0, batchSize, bitSet), AckType.Individual, null);
+
+ // Wait for the flush
+ Thread.sleep(1200);
+
+ // Trigger the flush again
+ for (int i = 0; i < 2; i++) {
+ tracker.addAcknowledgment(new MessageIdImpl(0L, 1L + i, -1), AckType.Individual, null);
+ }
+ }
+
public class ClientCnxTest extends ClientCnx {
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {Run this test, you will see the similar outputs with following:
2025-09-10T21:10:13,921+0800 INFO [nioEventLoopGroup-2-1] o.a.p.c.i.PersistentAcknowledgmentsGroupingTracker - XYZ flush 0 individual ack and 1 batch index ack {}
2025-09-10T21:10:14,124+0800 INFO [main] o.a.p.c.i.PersistentAcknowledgmentsGroupingTracker - XYZ flush 2 individual ack and 1 batch index ack {}
java.lang.IllegalStateException: Object has been recycled already.
at io.netty.util.Recycler$DefaultHandle.toAvailable(Recycler.java:287)
at io.netty.util.Recycler$LocalPool.release(Recycler.java:340)
at io.netty.util.Recycler$DefaultHandle.recycle(Recycler.java:260)
at org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable.recycle(ConcurrentBitSetRecyclable.java:58)
at org.apache.pulsar.common.protocol.Commands.newMultiMessageAckCommon(Commands.java:1063)
at org.apache.pulsar.common.protocol.Commands.newMultiMessageAck(Commands.java:1073)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.newMessageAckCommandAndWrite(PersistentAcknowledgmentsGroupingTracker.java:615)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flushAsync(PersistentAcknowledgmentsGroupingTracker.java:507)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:426)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualAck(PersistentAcknowledgmentsGroupingTracker.java:268)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$addAcknowledgment$3(PersistentAcknowledgmentsGroupingTracker.java:234)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addIndividualAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:218)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:232)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198)
at org.apache.pulsar.client.impl.AcknowledgementsGroupingTrackerTest.testConcurrentFlush(AcknowledgementsGroupingTrackerTest.java:442)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:829)
at org.testng.TestRunner.run(TestRunner.java:602)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
at org.testng.SuiteRunner.run(SuiteRunner.java:330)
Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug