Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Jan 27, 2025

Fixes #23504
Fixes #23506

Motivation

In the current implementation before this PR there are multiple problems:

Modifications

Enhanced permit acquisition:

  • Added queuing for pending requests when the limit is reached
  • Configurable queue size via managedLedgerMaxReadsInFlightPermitsAcquireQueueSize
  • Timeout configuration via managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis
  • Asynchronous completion of waiting requests to eliminate tight-loop spinning
  • Fast path (no queuing) when permits are immediately available

Improved size estimation:

  • Now uses average entry size for more accurate calculations
  • Previously used only the latest result's size

managedLedgerMaxReadsInFlightSizeInMB limit is now applied to replay queue reads

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@heesung-sohn
Copy link
Contributor

Could we confirm if we are not over-engineering this rate limiter?

  • can't we reuse any exisiting queue? why do we need a separate queue here?
  • why can't we use simple Guava.RateLimiter or other simpler method?

@shibd
Copy link
Member

shibd commented Jan 28, 2025

It looks like new configurations have been added and the code has been refactored. I think we need a PIP and should avoid cherry-picking back to previous versions.

@lhotari
Copy link
Member Author

lhotari commented Jan 28, 2025

It looks like new configurations have been added and the code has been refactored. I think we need a PIP and should avoid cherry-picking back to previous versions.

@shibd This PR is addressing bugs in the existing implementation, #23506 and #23504. I can always create a PIP to cover things, however I'd rather first have this PR reviewed. Avoiding cherry-picking back to previous versions doesn't make sense since this PR addresses clear issues that also Pulsar 3.0.x users are facing.
This PR now contains an unrelated fix for issue #23900 which is covered by PR #23903. Once that PR is merged, I'll rebase this PR.

@lhotari
Copy link
Member Author

lhotari commented Jan 28, 2025

Could we confirm if we are not over-engineering this rate limiter?

Good questions, @heesung-sn.

  • can't we reuse any exisiting queue? why do we need a separate queue here?

That's a valid point. In the current solution, there's also a queue when managedLedgerReadEntryTimeoutSeconds is set. The executor's queue is used as a queue. That's a bad solution for the queue since there will be a tight loop to wait for more permits to be available. That consumes unnecessary CPU and, in addition, there's a problem with "fairness". The requests acquiring permits don't get handled in a consistent order. The separate queue in InflightsReadsLimiter addresses both of these issues.

why do we need a separate queue here?

There will always be a need for a queue unless excessive traffic is shed by rejecting requests and making them fail. It's a valid question whether the implementation could queue up requests to another queue completely or fail requests.

The current solution already rejects requests when the limit is reached. That's a problem, especially for enabling the managedLedgerMaxReadsInFlightSizeInMB limit for replay queue reads.

The scope of this PR is to apply the managedLedgerMaxReadsInFlightSizeInMB limit for replay queue reads. If individual reads fail, the complete result will be discarded due to this logic:

@Override
public synchronized void readEntryComplete(Entry entry, Object ctx) {
if (exception.get() != null) {
// if there is already a failure for a different position, we should release the entry straight away
// and not add it to the list
entry.release();
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
}
} else {
entries.add(entry);
if (--pendingCallbacks == 0) {
if (sortEntries) {
entries.sort(ENTRY_COMPARATOR);
}
callback.readEntriesComplete(entries, ctx);
}
}
}
@Override
public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx) {
log.warn("[{}][{}] Error while replaying entries", ledger.getName(), name, mle);
if (exception.compareAndSet(null, mle)) {
// release the entries just once, any further read success will release the entry straight away
entries.forEach(Entry::release);
}
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
}
}

(I have other plans to improve this logic, but that's not the scope of this PR.)

That's why failing individual read operations isn't a proper way to trigger backpressure in Pulsar. It will add even more load to the system when retries happen. A better approach is to slow down processing (queue) and let all other backpressure solutions propagate and eventually slow down incoming work into the system. That's how backpressure should work, but there are currently gaps in Pulsar.

There's a broader design problem that backpressure isn't designed end-to-end so that this would actually happen and would be validated with analysis andtests. There was a discussion about backpressure on the dev mailing list in 2022 without much participation from the community (backpressure was also covered slightly in rate limiting discussions).

In general, there are solutions for minimal queues in backpressure solutions. When all operations are blocking operations, it's a simple problem to solve. With asynchronous operations, it becomes a hard problem to solve. In the JVM space, the Reactive Streams specification and libraries are explicitly focused on solving asynchronous non-blocking backpressure with minimal queues (buffers) in processing pipelines.

In the case of the managedLedgerMaxReadsInFlightSizeInMB limit, I believe that queuing in the way that is handled in this PR is the correct solution for handling backpressure. The reason for this is that the majority of read operations are created by dispatchers in Pulsar. There's a finite number of dispatchers at all times and each dispatcher will have at most 2 read operations (1 pending read and 1 pending replay queue read) in flight at a time. This makes it a "closed system". When the limit is reached, the dispatchers won't be adding more work into the system and will wait for more permits to be available.

  • why can't we use simple Guava.RateLimiter or other simpler method?

Essentially, InflightsReadsLimiter is like Java's java.util.concurrent.Semaphore. It's not a rate limiter where the rate is defined in terms of requests per second.

The InflightsReadLimiter class is already very simple:

  • The complete class file, including all comment headers, comments and empty lines, is 285 lines.
  • A lot of the code lines are related to metrics.

While there might be room for further simplification in the InflightReadLimiter, I believe the current implementation strikes a good balance between functionality and simplicity. The core logic is quite straightforward and the additional code is mostly for metrics and monitoring, which are important for production systems. How would you suggest simplifying the InflightReadLimiter?

@lhotari lhotari force-pushed the lh-improve-inflight-limiter branch from d6c1669 to b838cd1 Compare January 28, 2025 23:59
@codecov-commenter
Copy link

codecov-commenter commented Jan 29, 2025

Codecov Report

Attention: Patch coverage is 77.60618% with 58 lines in your changes missing coverage. Please review.

Project coverage is 74.18%. Comparing base (bbc6224) to head (d4fbb38).
Report is 876 commits behind head on master.

Files with missing lines Patch % Lines
...keeper/mledger/impl/cache/RangeEntryCacheImpl.java 67.77% 21 Missing and 8 partials ⚠️
...eeper/mledger/impl/cache/InflightReadsLimiter.java 80.39% 10 Missing and 10 partials ⚠️
...ache/pulsar/broker/ManagedLedgerClientFactory.java 54.54% 3 Missing and 2 partials ⚠️
...keeper/mledger/impl/cache/PendingReadsManager.java 91.30% 3 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23901      +/-   ##
============================================
+ Coverage     73.57%   74.18%   +0.60%     
+ Complexity    32624    31858     -766     
============================================
  Files          1877     1853      -24     
  Lines        139502   143733    +4231     
  Branches      15299    16337    +1038     
============================================
+ Hits         102638   106627    +3989     
+ Misses        28908    28694     -214     
- Partials       7956     8412     +456     
Flag Coverage Δ
inttests 26.70% <35.52%> (+2.12%) ⬆️
systests 23.20% <35.52%> (-1.12%) ⬇️
unittests 73.70% <77.60%> (+0.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...bookkeeper/mledger/ManagedLedgerFactoryConfig.java 100.00% <ø> (+10.52%) ⬆️
...kkeeper/mledger/impl/ManagedLedgerFactoryImpl.java 59.18% <100.00%> (-22.21%) ⬇️
...mledger/impl/cache/RangeEntryCacheManagerImpl.java 96.25% <100.00%> (+0.09%) ⬆️
...org/apache/pulsar/broker/ServiceConfiguration.java 98.16% <100.00%> (-1.24%) ⬇️
...keeper/mledger/impl/cache/PendingReadsManager.java 86.86% <91.30%> (+0.19%) ⬆️
...ache/pulsar/broker/ManagedLedgerClientFactory.java 82.29% <54.54%> (-1.86%) ⬇️
...eeper/mledger/impl/cache/InflightReadsLimiter.java 82.75% <80.39%> (+0.61%) ⬆️
...keeper/mledger/impl/cache/RangeEntryCacheImpl.java 65.83% <67.77%> (+7.08%) ⬆️

... and 1024 files with indirect coverage changes

@merlimat merlimat merged commit c5173d5 into apache:master Jan 29, 2025
52 checks passed
lhotari added a commit that referenced this pull request Jan 29, 2025
lhotari added a commit that referenced this pull request Jan 29, 2025
lhotari added a commit that referenced this pull request Jan 29, 2025
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 31, 2025
… replay queue reads (apache#23901)

(cherry picked from commit c5173d5)
(cherry picked from commit 37f0bc2)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 3, 2025
… replay queue reads (apache#23901)

(cherry picked from commit c5173d5)
(cherry picked from commit 37f0bc2)
@BewareMyPower
Copy link
Contributor

This PR is too huge as a "bug fix". How could a bug fix be so huge?

It's more like a new feature, in which a fix is hidden. In addition to #23952 you found, we also found a memory leak due to ByteBuf objects not released.

BTW, I agree with @heesung-sn's comment here.

@lhotari
Copy link
Member Author

lhotari commented Feb 10, 2025

This PR is too huge as a "bug fix". How could a bug fix be so huge?

@BewareMyPower Please see issues #23504 and #23506. The original solution has never worked properly.

It's more like a new feature, in which a fix is hidden. In addition to #23952 you found, we also found a memory leak due to ByteBuf objects not released.

Where is this issue reported?

BTW, I agree with @heesung-sn's comment here.

Sure, everyone has their right to independent opinions. If you look at the changes in this PR, they are all relevant for solving the issues.

@lhotari
Copy link
Member Author

lhotari commented Feb 10, 2025

BTW, I agree with @heesung-sn's comment here.

@BewareMyPower I have replied in #23901 (comment) to the questions. I hope that shades light into why this PR is relevant and necessary. Please let me know if you have any questions.

@BewareMyPower
Copy link
Contributor

I'm currently reviewing this PR so I can have a better understanding on it. Generally, we need a careful review on such a huge PR. But it's still hard. From my experiences, it's very easy to hide some bugs not noticed in a huge PR.

we also found a memory leak due to ByteBuf objects not released.

It's in SN's private repo, let me ping you in that issue. BTW, @shibd I think we should report it in the Apache repo as well.

@lhotari
Copy link
Member Author

lhotari commented Feb 10, 2025

I'm currently reviewing this PR so I can have a better understanding on it. Generally, we need a careful review on such a huge PR. But it's still hard. From my experiences, it's very easy to hide some bugs not noticed in a huge PR.

@BewareMyPower I don't disagree with you about the size of a PR. Large PRs are harder to review and there are more chances that the changes cause regressions.

It's also good to notice that if bugs pass CI and all regression test suites, it's also a bug in the test suites, which should also be addressed.

Some problems require larger PRs than others. If we start bashing contributors that fix broader issues, it's going to kill this project. I hope that instead of complaining about large PRs and regressions, please put the effort in improving test suites to catch the regressions and put the effort in reviews when it's needed. That's simply more productive for everyone.

we also found a memory leak due to ByteBuf objects not released.

It's in SN's private repo, let me ping you in that issue. BTW, @shibd I think we should report it in the Apache repo as well.

Memory leaks could also be seen as a failure of our test suites to catch them. For addressing memory leaks, it would be possible to run unit tests with paranoid level and fail the tests if any leaks are detected. This would be an effective way to catch such problems as early as possible.

@BewareMyPower
Copy link
Contributor

we also found a memory leak due to ByteBuf objects not released.

To avoid misleading someone else, the memory leak issue is not caused by this PR, see #23955

@lhotari
Copy link
Member Author

lhotari commented Feb 11, 2025

Fix for change related PendingReadsManager deadlock: #23958

hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Feb 26, 2025
… LTS

- Remove `managedLedgerOffloadReadThreads` from apache#24025
- Remove `managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis` and
  `managedLedgerMaxReadsInFlightPermitsAcquireQueueSize` from apache#23901
- Remove `managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis` from apache#22792
The configs above only increase the complexity and are hard to configure.

Add more comments to `managedLedgerPersistIndividualAckAsLongArray` from apache#23759.
This config was added to keep the compatibility from 3.x or earlier so
it has value to retain. 3.x users must configure it with false when
upgrading to 4.0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

7 participants