Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jul 29, 2025

Motivation

Background: how ZK metadata store handles the node exists error when creating a new node
ZK metadata store throws a BadVersionException error if it receives a NODEEXISTS error when creating a new node.

https://github.com/apache/pulsar/blob/v4.0.5/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L269-L272

if (code == Code.NODEEXISTS) {
    // We're emulating a request to create a node, so the version is invalid
    op.getFuture().completeExceptionally(getException(Code.BADVERSION, op.getPath()));
}

Background: how ZK metadata store handles disconnect error when executing operations
ZK metadata store will retry the operation if it receives a CONNECTIONLOSS error when executing operations.

https://github.com/apache/pulsar/blob/v4.0.5/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L200-L227

if (code == Code.CONNECTIONLOSS) {
    // Retry with the individual operations
    executor.schedule(() -> {
        ops.forEach(o -> batchOperation(Collections.singletonList(o)));
    }, 100, TimeUnit.MILLISECONDS);
}

Background: how ZK clients/servers limit the max length of packet

  • ZK clients/servers limit the max length of a packet to 10MB by default, which was set in bin/pulsar.sh
  • Users can use a Java environment variable jute.maxbuffer to change the maximum packet length, which controls the maximum packet length for both requests and receives.
  • Once the maximum packet length is reached, the ZK client will throw an error and reconnect.

Issue: ZK client throws an error and reconnects when the max length of the packet is exceeded by the server

  • As the background "How ZK clients/servers limit the max length of packet" said, not only is the request packet is in limiting, but the response packet is not limited.
  • The requests that type Get or GET_CHILDREN will receive a response from the server, and the response packet may be larger than the max length of packet length.
  • When the response packet is larger than the maximum packet length, the ZK client will throw an error and reconnect.
  • The reconnecting will cause the BadVersionException error mentioned in the background "How ZK metadata store handles the node exists error when creating a new node".
  • Then the operation of creating a node can never be completed.

You can reproduce the issue by running the new test testReceivedHugeResponse

2025-07-30T02:00:53,768 - WARN  - [bookkeeper-ml-scheduler-OrderedScheduler-6-0:ManagedCursorImpl] - [public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e] Failed to update cursor metadata for s1 due to version conflict org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
2025-07-30T02:00:53,782 - WARN  - [bookkeeper-ml-scheduler-OrderedScheduler-6-0:ManagedLedgerImpl] - [public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e] Failed to open cursor: ManagedCursorImpl{ledger=public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e, name=s1, ackPos=2:-1, readPos=2:0}
2025-07-30T02:00:53,782 - WARN  - [bookkeeper-ml-scheduler-OrderedScheduler-6-0:PersistentTopic] - [persistent://public/default/tp-9d89a816-a363-4607-bf0d-1deea4fe566e] Failed to create subscription for s1: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
2025-07-30T02:00:53,782 - ERROR - [bookkeeper-ml-scheduler-OrderedScheduler-6-0:PersistentTopic] - [persistent://public/default/tp-9d89a816-a363-4607-bf0d-1deea4fe566e] Failed to create subscription: s1
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1141) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$3.openCursorFailed(PersistentTopic.java:1153) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$6.operationFailed(ManagedLedgerImpl.java:1048) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$3.operationFailed(ManagedCursorImpl.java:813) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$25$1.operationComplete(ManagedCursorImpl.java:2923) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$25$1.operationComplete(ManagedCursorImpl.java:2918) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$asyncGetCursorInfo$13(MetaStoreImpl.java:226) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:718) [?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java) [?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [bookkeeper-common-4.17.2.jar:4.17.2]
	at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) [bookkeeper-common-4.17.2.jar:4.17.2]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.122.Final.jar:4.1.122.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
	... 19 more
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:516) ~[classes/:?]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:292) ~[classes/:?]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:258) ~[classes/:?]
	... 8 more
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers/public/default/persistent/tp-9d89a816-a363-4607-bf0d-1deea4fe566e/s1
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:121) ~[zookeeper-3.9.3.jar:3.9.3]
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:53) ~[zookeeper-3.9.3.jar:3.9.3]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:512) ~[classes/:?]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:292) ~[classes/:?]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:258) ~[classes/:?]
	... 8 more

Modifications

Add a mechanism to limit get/getChildren operations of the ZK metadata store. This mechanism merely avoids problems with as few resources as possible, but it cannot prevent all problems. To solve all the problems, there will be a PIP later to make the Settings configurable, see also the section "Next things to do"

Next things to do

I will submit a PIP to do the following things

  • Make MetadataNodePayloadLenEstimator configurable
  • Add metrics of the node path that has the max packet length of each type
  • Fix the issue of out-of-order command execution in the metadata store. I will describe this problem in detail in PIP
  • Solve the following issue

The Metadata store can not confirm whether the bad version error is expected or not after a reconnection.

  • ZK client sent a Put request.
    • Server has received the request and is handling it.
    • Server is responding.
  • The channel reconnected before the Put request was responded.
  • The Put request is sent again, but the node already exists(or already modified), so Pulsar get a BadVersionException error.
    • (Highlight) The previous change is caused by the first try, but the second try is stuck due to the ZK node has been changed by the first try.
  • Once this situation happens, we can only restart the broker or reload topics to fix it, since we do not know whether the bad version error is expected or not.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Jul 29, 2025
@poorbarcode poorbarcode self-assigned this Jul 29, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 29, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there's another way with ZooKeeper which this fix is targeting.

In current implementation the async version of "multi" is used:

protected void batchOperation(List<MetadataOp> ops) {
try {
zkc.multi(ops.stream().map(this::convertOp).collect(Collectors.toList()), (rc, path, ctx, results) -> {
if (results == null) {
Code code = Code.get(rc);
if (code == Code.CONNECTIONLOSS) {
// There is the chance that we caused a connection reset by sending or requesting a batch
// that passed the max ZK limit.

The ZooKeeper client also contains a synchronous version:
https://javadoc.io/doc/org.apache.zookeeper/zookeeper/latest/org/apache/zookeeper/ZooKeeper.html#multi(java.lang.Iterable)
Using the async multi API causes the problem in the first place. It would be possible to retry the reads by splitting the batch etc until it succeeds.

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode requested a review from lhotari July 31, 2025 02:03
@lhotari
Copy link
Member

lhotari commented Jul 31, 2025

I think that there's another way with ZooKeeper which this fix is targeting.

In current implementation the async version of "multi" is used:

protected void batchOperation(List<MetadataOp> ops) {
try {
zkc.multi(ops.stream().map(this::convertOp).collect(Collectors.toList()), (rc, path, ctx, results) -> {
if (results == null) {
Code code = Code.get(rc);
if (code == Code.CONNECTIONLOSS) {
// There is the chance that we caused a connection reset by sending or requesting a batch
// that passed the max ZK limit.

The ZooKeeper client also contains a synchronous version: https://javadoc.io/doc/org.apache.zookeeper/zookeeper/latest/org/apache/zookeeper/ZooKeeper.html#multi(java.lang.Iterable) Using the async multi API causes the problem in the first place. It would be possible to retry the reads by splitting the batch etc until it succeeds.

I checked this and it won't be useful since it would require that pipelining wouldn't be used, which would impact performance a lot. It seems that stats/estimation based approach is the way to go to mitigate the problem.

@poorbarcode I noticed that ZooKeeper has getNumChildren() in the stats for ZNodes. Perhaps it would be useful to use that before a namespace listing is performed? That would add latency to the first time the namespace listing is performed, but the number of children could be cached after that. The number of topics in a namespace could vary significantly so that's probably the main source of the problems.

Another source of problems in Pulsar is the ZNode data size for ml cursors (ManagedCursorInfo). I guess that stats/estimation based approach would work very well for that. It would be possible to set the estimated size high so that such operations would be executed in small batches.
Perhaps there could be tunables for configuring the initial estimates for different znode categories?

@poorbarcode poorbarcode requested a review from lhotari August 1, 2025 10:47
@codelipenghui
Copy link
Contributor

ZK clients/servers limit the max length of a packet to 1MB by default.

In Pulsar, we have added -Djute.maxbuffer=10485760" which change the max length to 10MB. Please help update the PR description to avoid the confusion

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had a online sync with @poorbarcode , here is the summary

  1. This PR will not 100% fix the issue, but it will provide a better metadata batch abstraction and implementation to handle large response from batch get or list request
  2. For this PR, it's better to have a MetadataBatchStrategy abstraction instead of MetadataNodePayloadLenEstimator. So that we can have different batch strategy implementations.
  3. The MetadataBatchStrategy implementation can leverage the children size of the znode to decide batch the list request or not, and for get request we can continue use the metrics (max response size of the znode path) to decide how many get request will be batched

For further improvement or fix, we can also consider follow potential solution (need to double confirm)

  1. Use different session for list and get request for non-ephemeral nodes to avoid the write impact due to large response size
  2. Add session ID to the znode, so that the broker can know the request was succeed and no need to retry
  3. And we can also consider to have hierarchical metadata organization for managed ledgers, partitioned topic for the super big Pulsar cluster (optional since it will bring more complexity to Pulsar)

@github-actions github-actions bot added the PIP label Aug 14, 2025
@poorbarcode
Copy link
Contributor Author

@lhotari @codelipenghui

I have rewritten with a new solution, could you please review again?

@coderzc coderzc modified the milestones: 4.1.0, 4.2.0 Sep 1, 2025
@coderzc coderzc removed the PIP label Sep 4, 2025
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please fix the checkstyle issue @poorbarcode

@codecov-commenter
Copy link

codecov-commenter commented Sep 11, 2025

Codecov Report

❌ Patch coverage is 93.89313% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.21%. Comparing base (cde4948) to head (58728bb).
⚠️ Report is 65 commits behind head on master.

Files with missing lines Patch % Lines
...g/apache/pulsar/metadata/impl/ZKMetadataStore.java 72.72% 4 Missing and 2 partials ⚠️
...pulsar/zookeeper/DefaultMetadataNodeSizeStats.java 97.82% 1 Missing and 2 partials ⚠️
...ta/impl/batching/ZKMetadataStoreBatchStrategy.java 94.00% 2 Missing and 1 partial ⚠️
...pl/batching/DefaultMetadataStoreBatchStrategy.java 90.47% 1 Missing and 1 partial ⚠️
...ulsar/metadata/api/DummyMetadataNodeSizeStats.java 83.33% 1 Missing ⚠️
...e/pulsar/metadata/impl/oxia/OxiaMetadataStore.java 66.66% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24580       +/-   ##
=============================================
+ Coverage     36.00%   74.21%   +38.21%     
- Complexity    12799    33604    +20805     
=============================================
  Files          1825     1900       +75     
  Lines        142779   148342     +5563     
  Branches      16393    17202      +809     
=============================================
+ Hits          51402   110097    +58695     
+ Misses        84275    29469    -54806     
- Partials       7102     8776     +1674     
Flag Coverage Δ
inttests 26.48% <74.04%> (?)
systests 22.71% <69.84%> (-0.60%) ⬇️
unittests 73.75% <93.89%> (+39.32%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...n/java/org/apache/pulsar/broker/PulsarService.java 84.65% <100.00%> (+17.43%) ⬆️
...he/pulsar/metadata/impl/AbstractMetadataStore.java 85.66% <100.00%> (+35.13%) ⬆️
...pulsar/metadata/impl/LocalMemoryMetadataStore.java 92.30% <100.00%> (+38.90%) ⬆️
...che/pulsar/metadata/impl/RocksdbMetadataStore.java 70.94% <100.00%> (+70.60%) ⬆️
...ta/impl/batching/AbstractBatchedMetadataStore.java 91.02% <100.00%> (+21.13%) ⬆️
...ulsar/metadata/api/DummyMetadataNodeSizeStats.java 83.33% <83.33%> (ø)
...e/pulsar/metadata/impl/oxia/OxiaMetadataStore.java 87.91% <66.66%> (+87.91%) ⬆️
...pl/batching/DefaultMetadataStoreBatchStrategy.java 90.47% <90.47%> (ø)
...pulsar/zookeeper/DefaultMetadataNodeSizeStats.java 97.82% <97.82%> (ø)
...ta/impl/batching/ZKMetadataStoreBatchStrategy.java 94.00% <94.00%> (ø)
... and 1 more

... and 1436 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode poorbarcode merged commit 1cb64a9 into apache:master Sep 11, 2025
51 checks passed
poorbarcode added a commit that referenced this pull request Sep 11, 2025
…received a large response from ZK (#24580)

(cherry picked from commit 1cb64a9)
poorbarcode added a commit that referenced this pull request Sep 11, 2025
…received a large response from ZK (#24580)

(cherry picked from commit 1cb64a9)
poorbarcode added a commit that referenced this pull request Sep 11, 2025
…received a large response from ZK (#24580)

(cherry picked from commit 1cb64a9)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
…received a large response from ZK (apache#24580)

(cherry picked from commit 1cb64a9)
(cherry picked from commit 28e1874)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
…received a large response from ZK (apache#24580)

(cherry picked from commit 1cb64a9)
(cherry picked from commit 28e1874)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…received a large response from ZK (apache#24580)

(cherry picked from commit 1cb64a9)
(cherry picked from commit 9b27ba2)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…received a large response from ZK (apache#24580)

(cherry picked from commit 1cb64a9)
(cherry picked from commit 9b27ba2)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…received a large response from ZK (apache#24580)

(cherry picked from commit 1cb64a9)
(cherry picked from commit 9b27ba2)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants