Skip to content

[improve][broker] PIP-192 Improved Auto Unload Logic#19813

Merged
Demogorgon314 merged 11 commits into
apache:masterfrom
heesung-sohn:pip-192-unload-improve
Mar 29, 2023
Merged

[improve][broker] PIP-192 Improved Auto Unload Logic#19813
Demogorgon314 merged 11 commits into
apache:masterfrom
heesung-sohn:pip-192-unload-improve

Conversation

@heesung-sohn

@heesung-sohn heesung-sohn commented Mar 15, 2023

Copy link
Copy Markdown
Contributor

Master Issue: Master Issue: #16691, #18099

Motivation

Raising a PR to implement Master Issue: #16691, #18099

We want to reduce unload frequencies from flaky traffic.

Modifications

This PR

  • Introduced a config loadBalancerSheddingConditionHitCountThreshold to further restrict shedding conditions based on the hit count.
  • Normalized offload traffic
  • Lowered the default loadBalanceSheddingDelayInSeconds value from 600 to 180, as 10 mins are too long. 3 mins can be long enough to catch the new load after unloads.
  • Changed the config loadBalancerBundleLoadReportPercentage to loadBalancerMaxNumberOfBundlesInBundleLoadReport to make the topk bundle count absolute instead of relative.
  • Renamed loadBalancerNamespaceBundleSplitConditionThreshold to loadBalancerNamespaceBundleSplitConditionHitCountThreshold to be consistent with *ConditionHitCountThreshold.
  • Renamed loadBalancerMaxNumberOfBrokerTransfersPerCycle to loadBalancerMaxNumberOfBrokerSheddingPerCycle.
  • Added LoadDataStore cleanup logic in BSC monitor.
  • Added msgThroughputEMA in BrokerLoadData to smooth the broker throughput info.
  • Updated Topk bundles sorted in a ascending order (instead of descending)
  • Update some info logs to only show in the debug mode.
  • Added load data tombstone upon Own, Releasing, Splitting
  • Added the bundle ownership(isOwned) check upon split and unload.
  • Added swap unload logic

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • *Updated unit tests.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

We will have separate PRs to update the Doc later.

Matching PR in forked repository

PR in forked repository: heesung-sohn#39

@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Mar 15, 2023
@Demogorgon314 Demogorgon314 self-requested a review March 15, 2023 03:14
@heesung-sohn

Copy link
Copy Markdown
Contributor Author

Please pause the review. There will be more updates.

@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch 4 times, most recently from 1af3e15 to 31a3c3c Compare March 17, 2023 23:19
@heesung-sohn

Copy link
Copy Markdown
Contributor Author

Please pause the review. There will be more updates.

Please continue the review.

@Demogorgon314

Copy link
Copy Markdown
Member

Please resolve the conflicts and rebase it into the master branch.

@heesung-sohn

Copy link
Copy Markdown
Contributor Author

Yes, I will rebase after the pr for excluding bundles with policies.

@Demogorgon314 Demogorgon314 added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker labels Mar 20, 2023
Comment on lines +244 to +246
Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare(
a.getValue().getWeightedMaxEMA(),
b.getValue().getWeightedMaxEMA()));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare(
a.getValue().getWeightedMaxEMA(),
b.getValue().getWeightedMaxEMA()));
brokersSortedByLoad.sort(Comparator.comparingDouble(a -> a.getValue().getWeightedMaxEMA()));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

return new NamespaceBundle(NamespaceName.get(namespace), hashRange, factory);
}).when(factory).getBundle(anyString(), anyString());
doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any());
doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager();
doReturn(new AtomicReference<>(loadManagerWrapper)).when(pulsar).getLoadManager();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


private PulsarService getMockPulsar() {
var pulsar = mock(PulsarService.class);
private PulsarService getMockPulsar(PulsarService pulsar) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is never used right now. Please remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException {
var pulsar = getMockPulsar();
TransferShedder transferShedder = new TransferShedder(pulsar, antiAffinityGroupPolicyHelper);
//var pulsar = getMockPulsar();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

partitionSort(arr, topk);

for (int i = 0; i < topk; i++) {
for (int i = topk - 1; i >= 0; i--) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reverse the topK bundles order?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to transfer lower-loaded ones first to meet the target threshold. If the highest is too much traffic, we can't unload at all.

@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch from 31a3c3c to 0ca5ae3 Compare March 21, 2023 02:05
@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch from 0ca5ae3 to f310c4f Compare March 21, 2023 23:46
@heesung-sohn

Copy link
Copy Markdown
Contributor Author

Rebase is done. Please continue the review.

@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch 3 times, most recently from d619eb9 to cbac501 Compare March 22, 2023 22:22
@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch from cbac501 to 1cc8aab Compare March 22, 2023 22:22

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
executor.execute(() -> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to use executor here? I did not see any block operating here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was to control this handleEvents in a single-threaded manner. I agree that we don't have to use executor here. Removing it.

task = loadManagerExecutor.scheduleAtFixedRate(() -> {
try {
execute();
if (conf.isLoadBalancerDebugModeEnabled()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use debugMode here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

private MinMaxPriorityQueue<String> maxBrokers;
private LoadDataStore<BrokerLoadData> loadDataStore;

private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad;
private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@heesung-sohn heesung-sohn force-pushed the pip-192-unload-improve branch from a9b4171 to 459631b Compare March 25, 2023 20:42
@Technoboy- Technoboy- added this to the 3.0.0 milestone Mar 27, 2023
@Technoboy- Technoboy- closed this Mar 27, 2023
@Technoboy- Technoboy- reopened this Mar 27, 2023
@heesung-sohn

Copy link
Copy Markdown
Contributor Author

@Demogorgon314 can we merge this pr?

@Demogorgon314 Demogorgon314 merged commit ee5ac86 into apache:master Mar 29, 2023
@heesung-sohn heesung-sohn deleted the pip-192-unload-improve branch April 2, 2024 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs ready-to-test type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants