[improve][broker] Pulsar Rate Limiting Refactoring changes (PIP-322)#21681
Conversation
… volatile int field
…ishBufferSizeInMB
merlimat
left a comment
There was a problem hiding this comment.
Great work here!
Can we also include the JMH benchmark from https://github.com/lhotari/async-tokenbucket/blob/master/src/jmh/java/com/github/lhotari/asynctokenbucket/AsyncTokenBucketBenchmark.java . It would be great to use it down the road for validating further changes to the implementation.
|
|
||
| @Override | ||
| public void close() { | ||
| closed = true; |
There was a problem hiding this comment.
Should we wait here until the internal thread is gone?
Eg:
- We interrupt the thread
- Use a barrier to wait on cloe
There was a problem hiding this comment.
I'll switch to use Thread.interrupt()
| /** | ||
| * Interface for a clock source that returns a monotonic time in nanoseconds with a required precision. | ||
| */ | ||
| public interface MonotonicClockSource { |
There was a problem hiding this comment.
I'd keep this in a separate file as it might be interesting for usages other than the rate limiter.
There was a problem hiding this comment.
+1. The reason to put everything in a single file was to make AsyncTokenBucket self contained at first. I'm thinking of moving all AsyncTokenBucket files to a separate "qos" package.
One detail about this MonotonicClockSource interface is designed for AsyncTokenBucket's use case, so I'll need to find a way to express that.
There was a problem hiding this comment.
Resolved by moving to separate file and renaming to MonotonicSnapshotClock
| * if false, the returned value can be a granular precision monotonic time in nanoseconds. | ||
| * @return the current monotonic clock time in nanoseconds | ||
| */ | ||
| long getNanos(boolean highPrecision); |
There was a problem hiding this comment.
Would we have to mix high/low precision in the same rate limiter instance? Otherwise, we could just have different implementations, for different tradeoffs or scopes.
There was a problem hiding this comment.
This is needed for AsyncTokenBucket's use case. I'll try to make it more understandable. The "highPrecision" is now perhaps causing the confusion. The use case in AsyncTokenBucket is with the usages where a consistent view of the token balance is requested.
I'll rename the parameter or add documentation to make it clear why there's both high/low precision in the same interface and method.
There was a problem hiding this comment.
Update: I am renaming MonotonicClockSource to MonotonicSnapshotClock and making it explicit that it's a clock that snapshots the underlying high precision clock source periodically. I am renaming "highPrecision" to "requestSnapshot" to make it clear that when the flag is set, the snapshot is updated in the call before returning the value.
| * if false, the returned value can be a granular precision monotonic time in nanoseconds. | ||
| * @return the current monotonic clock time in nanoseconds | ||
| */ | ||
| long getNanos(boolean highPrecision); |
There was a problem hiding this comment.
nit: maybe rename to getCurrentTimeNanos() ?
There was a problem hiding this comment.
+1, I'll rename it. getCurrentTickNanos perhaps. The reason for this is that time could be confusing since it's not the time of the day (like it is in System.currentTimeMillis(). I think tick might be a better description of what the returned value is.
There was a problem hiding this comment.
Renaming to getTickNanos
| * Instantiating this class creates a daemon thread that updates the monotonic time. The close method | ||
| * should be called to stop the thread. | ||
| */ | ||
| public static class GranularMonotonicClockSource implements MonotonicClockSource, AutoCloseable { |
There was a problem hiding this comment.
We can move to separate file
| * constructing higher-level asynchronous rate limiter implementations, which require side effects for throttling. | ||
| * <p>To achieve optimal performance, pass a {@link GranularMonotonicClockSource} as the clock source. | ||
| */ | ||
| public abstract class AsyncTokenBucket { |
There was a problem hiding this comment.
There are many classes/interfaces here. Let's move this to a dedicated package and have all classes in separate files.
…ng duration directly
@merlimat Thanks for that review! Great idea! I added a module "microbench". There's also a README.md file that explains how to compile and run the JMH benchmarks. |
### Motivation [Pulsar#21681](apache/pulsar#21681) introduced new API changes, which caused a compatibility issue. ### Modifications Use `incrementPublishCount(producer, numMessages, numBytes);` to set the Rate Limiting required counts.

Fixes #21442
PIP: #21680
Motivation
See PIP-322 for details.
Modifications
See PIP-322 for details.
Documentation
docdoc-requireddoc-not-neededdoc-complete