Skip to content

[Bug] Batch index ACK might recycle ConcurrentBitSetRecyclable twice #24724

@BewareMyPower

Description

@BewareMyPower

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

Issue Description

PersistentAcknowledgmentsGroupingTracker#flush could be called concurrently in different threads: one from the scheduled task according to acknowledgementGroupTimeMicros, the other is from the thread that calls Consumer#acknowledgeAsync.

When calling flush concurrently, the access to pendingIndividualBatchIndexAcks is not thread safe:

if (!pendingIndividualBatchIndexAcks.isEmpty()) {
Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> iterator =
pendingIndividualBatchIndexAcks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next();
entriesToAck.add(Triple.of(
entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue()));
iterator.remove();

  • Thread 1: Add ConcurrentBitSetRecyclable to entriesToAck
  • Thread 2: Add ConcurrentBitSetRecyclable to entriesToAck
  • Thread 1: Remove the entry that holds that bit set
  • Thread 2: Remove the entry that holds that bit set (and will fail)

Then two threads will share the same ConcurrentBitSetRecyclable object and call recycle twice.

Error messages


Reproducing the issue

It's hard to reproduce without injecting delay. Here is the patch that applies on 7547fab

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 5f7957d7f1..f982108c1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -445,6 +445,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         // Flush all individual acks
         List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
                 new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
+        log.info("XYZ flush {} individual ack and {} batch index ack", pendingIndividualAcks.size(),
+                pendingIndividualBatchIndexAcks.size());
         if (!pendingIndividualAcks.isEmpty()) {
             if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 // We can send 1 single protobuf command with all individual acks
@@ -492,6 +494,10 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                 Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next();
                 entriesToAck.add(Triple.of(
                         entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue()));
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ignored) {
+                }
                 iterator.remove();
             }
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index bbf1265489..7c28af5e3e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -419,6 +419,30 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }

+    @Test
+    public void testConcurrentFlush() throws Exception {
+        when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE);
+        final var conf = new ConsumerConfigurationData<byte[]>();
+        conf.setBatchIndexAckEnabled(true);
+        conf.setMaxAcknowledgmentGroupSize(2);
+        conf.setAckReceiptEnabled(false); // disable the lock when flushing
+        conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(1));
+        final var tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+
+        final var batchSize = 3;
+        final var bitSet = new BitSet(batchSize);
+        bitSet.set(0, batchSize);
+        tracker.addAcknowledgment(new BatchMessageIdImpl(0L, 0L, -1, 0, batchSize, bitSet), AckType.Individual, null);
+
+        // Wait for the flush
+        Thread.sleep(1200);
+
+        // Trigger the flush again
+        for (int i = 0; i < 2; i++) {
+            tracker.addAcknowledgment(new MessageIdImpl(0L, 1L + i, -1), AckType.Individual, null);
+        }
+    }
+
     public class ClientCnxTest extends ClientCnx {

         public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {

Run this test, you will see the similar outputs with following:

2025-09-10T21:10:13,921+0800 INFO  [nioEventLoopGroup-2-1] o.a.p.c.i.PersistentAcknowledgmentsGroupingTracker - XYZ flush 0 individual ack and 1 batch index ack {}
2025-09-10T21:10:14,124+0800 INFO  [main] o.a.p.c.i.PersistentAcknowledgmentsGroupingTracker - XYZ flush 2 individual ack and 1 batch index ack {}

java.lang.IllegalStateException: Object has been recycled already.

	at io.netty.util.Recycler$DefaultHandle.toAvailable(Recycler.java:287)
	at io.netty.util.Recycler$LocalPool.release(Recycler.java:340)
	at io.netty.util.Recycler$DefaultHandle.recycle(Recycler.java:260)
	at org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable.recycle(ConcurrentBitSetRecyclable.java:58)
	at org.apache.pulsar.common.protocol.Commands.newMultiMessageAckCommon(Commands.java:1063)
	at org.apache.pulsar.common.protocol.Commands.newMultiMessageAck(Commands.java:1073)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.newMessageAckCommandAndWrite(PersistentAcknowledgmentsGroupingTracker.java:615)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flushAsync(PersistentAcknowledgmentsGroupingTracker.java:507)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:426)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualAck(PersistentAcknowledgmentsGroupingTracker.java:268)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$addAcknowledgment$3(PersistentAcknowledgmentsGroupingTracker.java:234)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addIndividualAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:218)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:232)
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198)
	at org.apache.pulsar.client.impl.AcknowledgementsGroupingTrackerTest.testConcurrentFlush(AcknowledgementsGroupingTrackerTest.java:442)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)

Additional information

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions