Skip to content

[improve][broker] Pulsar Rate Limiting Refactoring changes (PIP-322)#21681

Merged
merlimat merged 155 commits into
apache:masterfrom
lhotari:lh-rate-limiter-refactoring
Dec 15, 2023
Merged

[improve][broker] Pulsar Rate Limiting Refactoring changes (PIP-322)#21681
merlimat merged 155 commits into
apache:masterfrom
lhotari:lh-rate-limiter-refactoring

Conversation

@lhotari

@lhotari lhotari commented Dec 6, 2023

Copy link
Copy Markdown
Member

Fixes #21442
PIP: #21680

Motivation

See PIP-322 for details.

Modifications

See PIP-322 for details.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari marked this pull request as ready for review December 11, 2023 20:31
@lhotari lhotari requested a review from Shawyeok December 11, 2023 20:33
@lhotari lhotari marked this pull request as draft December 12, 2023 12:18
@lhotari lhotari marked this pull request as ready for review December 13, 2023 16:42
@lhotari lhotari marked this pull request as draft December 13, 2023 16:54
@lhotari lhotari marked this pull request as ready for review December 14, 2023 15:12

@merlimat merlimat left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work here!

Can we also include the JMH benchmark from https://github.com/lhotari/async-tokenbucket/blob/master/src/jmh/java/com/github/lhotari/asynctokenbucket/AsyncTokenBucketBenchmark.java . It would be great to use it down the road for validating further changes to the implementation.


@Override
public void close() {
closed = true;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait here until the internal thread is gone?

Eg:

  1. We interrupt the thread
  2. Use a barrier to wait on cloe

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch to use Thread.interrupt()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Interface for a clock source that returns a monotonic time in nanoseconds with a required precision.
*/
public interface MonotonicClockSource {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep this in a separate file as it might be interesting for usages other than the rate limiter.

@lhotari lhotari Dec 15, 2023

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The reason to put everything in a single file was to make AsyncTokenBucket self contained at first. I'm thinking of moving all AsyncTokenBucket files to a separate "qos" package.

One detail about this MonotonicClockSource interface is designed for AsyncTokenBucket's use case, so I'll need to find a way to express that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved by moving to separate file and renaming to MonotonicSnapshotClock

* if false, the returned value can be a granular precision monotonic time in nanoseconds.
* @return the current monotonic clock time in nanoseconds
*/
long getNanos(boolean highPrecision);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we have to mix high/low precision in the same rate limiter instance? Otherwise, we could just have different implementations, for different tradeoffs or scopes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for AsyncTokenBucket's use case. I'll try to make it more understandable. The "highPrecision" is now perhaps causing the confusion. The use case in AsyncTokenBucket is with the usages where a consistent view of the token balance is requested.

I'll rename the parameter or add documentation to make it clear why there's both high/low precision in the same interface and method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I am renaming MonotonicClockSource to MonotonicSnapshotClock and making it explicit that it's a clock that snapshots the underlying high precision clock source periodically. I am renaming "highPrecision" to "requestSnapshot" to make it clear that when the flag is set, the snapshot is updated in the call before returning the value.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. PTAL.

* if false, the returned value can be a granular precision monotonic time in nanoseconds.
* @return the current monotonic clock time in nanoseconds
*/
long getNanos(boolean highPrecision);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename to getCurrentTimeNanos() ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'll rename it. getCurrentTickNanos perhaps. The reason for this is that time could be confusing since it's not the time of the day (like it is in System.currentTimeMillis(). I think tick might be a better description of what the returned value is.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming to getTickNanos

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Instantiating this class creates a daemon thread that updates the monotonic time. The close method
* should be called to stop the thread.
*/
public static class GranularMonotonicClockSource implements MonotonicClockSource, AutoCloseable {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move to separate file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* constructing higher-level asynchronous rate limiter implementations, which require side effects for throttling.
* <p>To achieve optimal performance, pass a {@link GranularMonotonicClockSource} as the clock source.
*/
public abstract class AsyncTokenBucket {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many classes/interfaces here. Let's move this to a dedicated package and have all classes in separate files.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@lhotari

lhotari commented Dec 15, 2023

Copy link
Copy Markdown
Member Author

Great work here!

Can we also include the JMH benchmark from https://github.com/lhotari/async-tokenbucket/blob/master/src/jmh/java/com/github/lhotari/asynctokenbucket/AsyncTokenBucketBenchmark.java . It would be great to use it down the road for validating further changes to the implementation.

@merlimat Thanks for that review! Great idea! I added a module "microbench". There's also a README.md file that explains how to compile and run the JMH benchmarks.
I have now addressed your review comments. PTAL

@merlimat merlimat merged commit c4cff0a into apache:master Dec 15, 2023
@nodece

nodece commented Jan 31, 2024

Copy link
Copy Markdown
Member

This PR looks to break the resourcegroup rate limiter, please see the Prometheus graph:

image

When using the 3.3.0 version, the pulsar_rate_in metric is not as smooth as the 3.0.1 version.

The following is the broker.conf:

PULSAR_PREFIX_resourceUsageTransportClassName=org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager

Test:

bin/pulsar-admin resourcegroups create rg-test-1 -mp 200
bin/pulsar-admin namespaces set-resource-group public/default -rgn rg-test-1
bin/pulsar-perf produce rg-test-1 --disable-batching -r 600 --size 10

Could you help to check out this issue?

Demogorgon314 added a commit to Demogorgon314/kop that referenced this pull request Apr 15, 2026
### Motivation

[Pulsar#21681](apache/pulsar#21681) introduced
new API changes, which caused a compatibility issue.

### Modifications

Use `incrementPublishCount(producer, numMessages, numBytes);` to set the
Rate Limiting required counts.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] RateLimiter lock contention when use precise publish rate limiter

6 participants