[improve][broker] PIP-192 Improved Auto Unload Logic#19813
Conversation
|
Please pause the review. There will be more updates. |
1af3e15 to
31a3c3c
Compare
Please continue the review. |
|
Please resolve the conflicts and rebase it into the master branch. |
|
Yes, I will rebase after the pr for excluding bundles with policies. |
| Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare( | ||
| a.getValue().getWeightedMaxEMA(), | ||
| b.getValue().getWeightedMaxEMA())); |
There was a problem hiding this comment.
nit:
| Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare( | |
| a.getValue().getWeightedMaxEMA(), | |
| b.getValue().getWeightedMaxEMA())); | |
| brokersSortedByLoad.sort(Comparator.comparingDouble(a -> a.getValue().getWeightedMaxEMA())); |
| return new NamespaceBundle(NamespaceName.get(namespace), hashRange, factory); | ||
| }).when(factory).getBundle(anyString(), anyString()); | ||
| doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any()); | ||
| doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager(); |
There was a problem hiding this comment.
| doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager(); | |
| doReturn(new AtomicReference<>(loadManagerWrapper)).when(pulsar).getLoadManager(); |
|
|
||
| private PulsarService getMockPulsar() { | ||
| var pulsar = mock(PulsarService.class); | ||
| private PulsarService getMockPulsar(PulsarService pulsar) { |
There was a problem hiding this comment.
This method is never used right now. Please remove it.
| public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException { | ||
| var pulsar = getMockPulsar(); | ||
| TransferShedder transferShedder = new TransferShedder(pulsar, antiAffinityGroupPolicyHelper); | ||
| //var pulsar = getMockPulsar(); |
| partitionSort(arr, topk); | ||
|
|
||
| for (int i = 0; i < topk; i++) { | ||
| for (int i = topk - 1; i >= 0; i--) { |
There was a problem hiding this comment.
Why reverse the topK bundles order?
There was a problem hiding this comment.
We want to transfer lower-loaded ones first to meet the target threshold. If the highest is too much traffic, we can't unload at all.
31a3c3c to
0ca5ae3
Compare
0ca5ae3 to
f310c4f
Compare
|
Rebase is done. Please continue the review. |
d619eb9 to
cbac501
Compare
cbac501 to
1cc8aab
Compare
|
|
||
| @Override | ||
| public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { | ||
| executor.execute(() -> { |
There was a problem hiding this comment.
Is there any reason to use executor here? I did not see any block operating here.
There was a problem hiding this comment.
The intention was to control this handleEvents in a single-threaded manner. I agree that we don't have to use executor here. Removing it.
| task = loadManagerExecutor.scheduleAtFixedRate(() -> { | ||
| try { | ||
| execute(); | ||
| if (conf.isLoadBalancerDebugModeEnabled()) { |
There was a problem hiding this comment.
Why don't we use debugMode here?
| private MinMaxPriorityQueue<String> maxBrokers; | ||
| private LoadDataStore<BrokerLoadData> loadDataStore; | ||
|
|
||
| private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; |
There was a problem hiding this comment.
| private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; | |
| private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; |
a9b4171 to
459631b
Compare
|
@Demogorgon314 can we merge this pr? |
Master Issue: Master Issue: #16691, #18099
Motivation
Raising a PR to implement Master Issue: #16691, #18099
We want to reduce unload frequencies from flaky traffic.
Modifications
This PR
loadBalancerSheddingConditionHitCountThresholdto further restrict shedding conditions based on the hit count.loadBalanceSheddingDelayInSecondsvalue from 600 to 180, as 10 mins are too long. 3 mins can be long enough to catch the new load after unloads.loadBalancerBundleLoadReportPercentagetoloadBalancerMaxNumberOfBundlesInBundleLoadReportto make the topk bundle count absolute instead of relative.loadBalancerNamespaceBundleSplitConditionThresholdtoloadBalancerNamespaceBundleSplitConditionHitCountThresholdto be consistent with*ConditionHitCountThreshold.loadBalancerMaxNumberOfBrokerTransfersPerCycletoloadBalancerMaxNumberOfBrokerSheddingPerCycle.msgThroughputEMAin BrokerLoadData to smooth the broker throughput info.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeWe will have separate PRs to update the Doc later.
Matching PR in forked repository
PR in forked repository: heesung-sohn#39