Skip to content

Conversation

@codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jul 2, 2025

Issue Description (on version 3.0.10)

  • Location: MultiTopicsConsumerImpl.java:581 in the negativeAcknowledge(Message<?> message) method
  • Error: java.lang.NullPointerException
  • Root Cause: Race condition between closeAsync() and negativeAcknowledge() methods

Technical Details

What was happening:

  1. closeAsync() sets consumer state to Closing/Closed and calls cleanupMultiConsumer()
  2. cleanupMultiConsumer() sets unAckedMessageTracker = null (line 685)
  3. Concurrently, negativeAcknowledge() methods were still being called
  4. These methods accessed unAckedMessageTracker.remove(messageId) without checking if it was null
  5. This caused NullPointerException at line 581

Race condition timeline:
Thread 1: closeAsync() → setState(Closed) → cleanupMultiConsumer() → unAckedMessageTracker = null
Thread 2: negativeAcknowledge() → unAckedMessageTracker.remove() → NPE!

The original stacktrace:

  Stacktrace:
  java.lang.NullPointerException: null
  class java.lang.NullPointerException: null
      at
  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.negativeAcknowledge(MultiTopicsConsumerImpl.java:581)
      at com.company.pulsar.FutureAsyncHandler.negativeAcknowledgeAsync(FutureAsyncHandler.scala:98)
      at com.company.pulsar.FutureAsyncHandler.negativeAcknowledgeAsync(FutureAsyncHandler.scala:25)
      at com.company.pulsar.consumer.DefaultConsumer.negativeAcknowledgeAsync(Consumer.scala:135)
      at com.company.pulsar.consumer.PulsarCommittableSourceGraphStage$CommittableMessageImpl.nack(CommittableSour
  ce.scala:134)
      at com.company.pulsar.PulsarTracer$TracedCommittableMessage.$anonfun$nack$1(PulsarTracer.scala:127)
      at com.company.pulsar.PulsarTracer$TracedCommittableMessage.$anonfun$wrapReply$1(PulsarTracer.scala:112)
      at com.company.domainapi.tracer.TraceFactory.$anonfun$decoratedOrigFn$1(TraceFactory.scala:93)
      at com.company.tracer.TraceFactoryImpl.buildSpanAroundFuture(TraceFactoryImpl.scala:197)
      at com.company.tracer.TraceFactoryImpl.$anonfun$tracedFt$1(TraceFactoryImpl.scala:123)
      at com.company.domainapi.tracer.TraceFactory.wrapChildCreation(TraceFactory.scala:161)
      at com.company.domainapi.tracer.TraceFactory.wrapChildCreation$(TraceFactory.scala:150)
      at com.company.tracer.TraceFactoryImpl.wrapChildCreation(TraceFactoryImpl.scala:29)
      at com.company.tracer.TraceFactoryImpl.tracedFt(TraceFactoryImpl.scala:123)
      at com.company.tracer.SpanTrace.childFt(BaseTrace.scala:121)
      at com.company.pulsar.PulsarTracer$TracedCommittableMessage.wrapReply(PulsarTracer.scala:108)
      at com.company.pulsar.PulsarTracer$TracedCommittableMessage.nack(PulsarTracer.scala:127)
      at com.company.pulsar.PulsarConsumerService$anonfun$nestedInanonfun$enqueueAndStartTask$3$1.applyOrElse(Puls
  arConsumerService.scala:749)
      at com.company.pulsar.PulsarConsumerService$anonfun$nestedInanonfun$enqueueAndStartTask$3$1.applyOrElse(Puls
  arConsumerService.scala:746)
      at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:490)
      at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
      at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
      at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
      at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
      at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
      at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurato
  r.scala:57)
      at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
      at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
      at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
      at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
      at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Modification

Make negativeAcknowledge return immediately if the consumer is not Ready instead of throwing exceptions for the follow reasons

  • Throwing exception will break the API since we didn't declare the PulsarClientException for the method signature.
  • If the consumer gets closed, all the unacked messages will be redelivered automatically. Users don't need to care about the failures of negativeAcknowledge() in this case.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codelipenghui codelipenghui self-assigned this Jul 2, 2025
@codelipenghui codelipenghui added this to the 4.1.0 milestone Jul 2, 2025
@codelipenghui codelipenghui added release/4.0.6 release/3.0.13 release/3.3.8 type/bug The PR fixed a bug or issue reported a bug doc-not-needed Your PR changes do not impact docs area/client ready-to-test labels Jul 2, 2025
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codecov-commenter
Copy link

codecov-commenter commented Jul 2, 2025

Codecov Report

Attention: Patch coverage is 0% with 8 lines in your changes missing coverage. Please review.

Project coverage is 74.28%. Comparing base (bbc6224) to head (9b9eb57).
Report is 1181 commits behind head on master.

Files with missing lines Patch % Lines
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 0.00% 6 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24476      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.70%     
- Complexity    32624    32769     +145     
============================================
  Files          1877     1868       -9     
  Lines        139502   145631    +6129     
  Branches      15299    16675    +1376     
============================================
+ Hits         102638   108181    +5543     
+ Misses        28908    28871      -37     
- Partials       7956     8579     +623     
Flag Coverage Δ
inttests 26.69% <0.00%> (+2.11%) ⬆️
systests 23.37% <0.00%> (-0.95%) ⬇️
unittests 73.77% <0.00%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 77.93% <0.00%> (+0.21%) ⬆️

... and 1088 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@nodece nodece merged commit 8916730 into apache:master Jul 3, 2025
73 of 83 checks passed
@codelipenghui codelipenghui deleted the penghui/fix-npe branch July 3, 2025 05:06
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Jul 15, 2025
lhotari pushed a commit that referenced this pull request Jul 17, 2025
lhotari pushed a commit that referenced this pull request Jul 17, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 23, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 24, 2025
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants