Skip to content

Conversation

@dongzhonghua
Copy link
Contributor

Motivation

AvgShedder encounter Exception when sort broker by scores.

2025-11-05T17:11:24,894+0800 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.LoadSheddingTask.run(LoadSheddingTask.java:59) - Error during the load shedding
java.lang.IllegalArgumentException: Comparison method violates its general contract!        
        at java.base/java.util.TimSort.mergeHi(TimSort.java:903) ~[?:?]
        at java.base/java.util.TimSort.mergeAt(TimSort.java:520) ~[?:?]
        at java.base/java.util.TimSort.mergeForceCollapse(TimSort.java:461) ~[?:?]
        at java.base/java.util.TimSort.sort(TimSort.java:254) ~[?:?]
        at java.base/java.util.Arrays.sort(Arrays.java:1307) ~[?:?]
        at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353) ~[?:?]
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510) ~[?:?]
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[?:?]
        at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:?]
        at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[?:?]
        at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622) ~[?:?]
        at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627) ~[?:?]
        at org.apache.pulsar.broker.loadbalance.impl.AvgShedder.calculateScoresAndSort(AvgShedder.java:228) ~[org.apache.pulsar-pulsar-broker-3.3.6.13-kwai.jar:3.3.6.13-kwai]
        at org.apache.pulsar.broker.loadbalance.impl.AvgShedder.findBundlesForUnloading(AvgShedder.java:79) ~[org.apache.pulsar-pulsar-broker-3.3.6.13-kwai.jar:3.3.6.13-kwai]
        at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.doLoadShedding(ModularLoadManagerImpl.java:652) ~[org.apache.pulsar-pulsar-broker-3.3.6.13-kwai.jar:3.3.6.13-kwai]
        at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper.doLoadShedding(ModularLoadManagerWrapper.java:52) ~[org.apache.pulsar-pulsar-broker-3.3.6.13-kwai.jar:3.3.6.13-kwai]
        at org.apache.pulsar.broker.loadbalance.LoadSheddingTask.run(LoadSheddingTask.java:57) ~[org.apache.pulsar-pulsar-broker-3.3.6.13-kwai.jar:3.3.6.13-kwai]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
    private List<String> calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf) {
        brokerScoreMap.clear();

        // calculate scores of brokers.
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            LocalBrokerData localBrokerData = entry.getValue().getLocalData();
            String broker = entry.getKey();
            Double score = calculateScores(localBrokerData, conf);
            brokerScoreMap.put(broker, score);
            if (log.isDebugEnabled()) {
                log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score,
                        localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(),
                        localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut());
            }
        }

        // sort brokers by scores.
        return brokerScoreMap.entrySet().stream().sorted((o1, o2) -> (int) (o1.getValue() - o2.getValue()))
                .map(Map.Entry::getKey).toList();
    }

There is why:

Double value o1.getValue() - o2.getValue() cast to int wil loss precision and break violation of transitivity.
For example:

brokerScoreMap.put("X", 0.999);
brokerScoreMap.put("Y", 1.0);
brokerScoreMap.put("Z", 1.999);

(int)(0.999 - 1.0) = (int)(-0.001) = 0 → X == Y
(int)(1.0 - 1.999) = (int)(-0.999) = 0 → Y == Z
(int)(0.999 - 1.999) = (int)(-1.0) = -1 → X < Z

Modifications

Fix double cost to int. Use Comparator.comparingDouble

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 6, 2025
@lhotari lhotari added this to the 4.2.0 milestone Nov 6, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari requested a review from thetumbled November 6, 2025 15:44
@codecov-commenter
Copy link

codecov-commenter commented Nov 6, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.24%. Comparing base (7c343d0) to head (43f689f).
⚠️ Report is 20 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff            @@
##             master   #24954   +/-   ##
=========================================
  Coverage     74.24%   74.24%           
+ Complexity    33911    33892   -19     
=========================================
  Files          1913     1913           
  Lines        149510   149510           
  Branches      17373    17373           
=========================================
+ Hits         110997   111008   +11     
+ Misses        29685    29638   -47     
- Partials       8828     8864   +36     
Flag Coverage Δ
inttests 26.36% <0.00%> (+0.07%) ⬆️
systests 22.82% <0.00%> (+0.10%) ⬆️
unittests 73.77% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...che/pulsar/broker/loadbalance/impl/AvgShedder.java 71.68% <100.00%> (ø)

... and 101 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari merged commit 186b503 into apache:master Nov 6, 2025
56 checks passed
lhotari pushed a commit that referenced this pull request Nov 11, 2025
Co-authored-by: dongzhonghua03 <dongzhonghua03@kuaishou.com>
(cherry picked from commit 186b503)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
Co-authored-by: dongzhonghua03 <dongzhonghua03@kuaishou.com>
(cherry picked from commit 186b503)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
Co-authored-by: dongzhonghua03 <dongzhonghua03@kuaishou.com>
(cherry picked from commit 186b503)
(cherry picked from commit e786cc2)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 14, 2025
Co-authored-by: dongzhonghua03 <dongzhonghua03@kuaishou.com>
(cherry picked from commit 186b503)
(cherry picked from commit e786cc2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants