Add rate limiter primitives#235
Conversation
| ): | ||
| /** Limits the rate of execution of the given operation | ||
| */ | ||
| def apply[T](operation: => T): Option[T] = |
There was a problem hiding this comment.
do I understand correctly that the result is None when the limit is exceeded, and the policy is Drop? and Some(_) when the limit is not exceeded, or it is exceeded, but the policy is to Block?
If so, I think we'd have to split this into two operations: RateLimiter.runBlocking(t: T): T and RateLimiter.runOrDrop(t: T): Option[T]. There definitely are scenarios for both policies, but the most basic use-case is to run an operation and block, if the limit is exceeded. If you know that the policy is Block, you'd have to always .get the returned Option, which is bad.
There was a problem hiding this comment.
Thanks for the review! That was the intended behaviour. The problem with splitting is that it would render new BlockingPolicy implementations difficult to make. For example, users might want a custom policy where the operation is just slowed down or blocking the first and dropping the rest. A possibility would be continuing with just apply by making RateLimiter generic:
RateLimiter[F[_]]:
def apply[T](operation: => T): F[T]
I also think that a good idea would be to allow running with a particular configuration, so the final API would be this. Actually we could use dependent types:
RateLimiter(config: RateLimiterConfig):
def apply[T](operation: => T): config.F[T]
def apply[T](operation: => T, opCfg: Cfg): config.F[T]
This would allow a custom BlockingPolicy to implement blocking or dropping (or something different like throttling) behaviour per operation:
rateLimiter(operation, CustomCfg.block())
rateLimiter(operation, CustomCfg.drop())
Only disadvantage might be verbosity but I believe the possibility of custom implementations outweighs it. What are your thoughts before proceeding?
There was a problem hiding this comment.
This could work, but I'm afraid would be too complicated. You're right that we might loose some flexibility, but the main goal would be to address to most-common use-cases - which have to be served well. To be honest, cases such as slowing down the first operation / dropping the rest, seem to be quite specialised, and it would be totally acceptable for them to require writing some custom code. That is, you could reuse the algorithm part, but everything around it would need to be written by hand.
So I'd opt for a simple interface (no dependent / higher-order types) solving the common case, while providing building blocks for implementing more advanced use-cases
| if config.blockingPolicy.isUnblocked then | ||
| if config.algorithm.isUnblocked then | ||
| if config.isReady then | ||
| config.acceptOperation |
There was a problem hiding this comment.
I didn't dive into the implementation yet, but isn't this race'y? That is, if two threads concurrently proceed through the three if-s, they could both concurrently call .acceptOperation, even if this would exceed the limit? It feels like accepting should be an atomic operation, which might fail (due to other threads exceeding the limit)
| scope.shutdown() | ||
| scope.join().discard | ||
| // join might have been interrupted | ||
| try f(using capability) |
There was a problem hiding this comment.
was this reformatted by accident? doesn't look fine, maybe we need braces
|
Thanks for the PR! I left some initial comments. Once these are resolved I'll do a more thorough review. One thing that's missing and that we'd definitely need is some documentation (in |
|
@adamw I've adressed the comments and added documentation. I've splitted the API in |
| def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] = | ||
| val future = executor.add(algorithm, operation) | ||
| executor.execute(algorithm, operation) | ||
| Await.result(future, Duration.Inf) |
There was a problem hiding this comment.
I'm afraid we can't simply use futures. One, because it introduces an unnecessary asynchronous barrier in the "happy path" (when the rate limiter is open, the operation should just go through on the same thread. Two, because it disrupts the stack trace, in case of an exception - either loosing context or adding an additional layer of exceptions.
So we want to run the operations on the same thread, from which they are called, optionally blocking, if necessary
There was a problem hiding this comment.
I've changed the apply method so that it executes the operation in same thread. I'm not sure if it would be possible to avoid the use of Future or some synchronization in the implementation of the blocking algorithm because ideally, blocked operations would be executed before the ones scheduled later on. If we checked if the queue was full or not before checking whether the rate limiter is open we would need a lock or something to make this atomically. The dropping one is now free of use of futures.
There was a problem hiding this comment.
Hm yes good point. Let's delegate this to the VT scheduler, and document properly: that we don't guarantee that the first operation to become blocked will be the first to execute. And fairness is up to virtual threads.
There was a problem hiding this comment.
I've added a fairness parameter for the blocking executor and the RateLimiter API that defaults to false. There is still some use of Futures which I think is unavoidable as they take care of updating the rate limiter.
It's tricky to avoid them because if you just use acquire on the Semaphore, you're blocking the thread and can't update the rate limiter. And it would be worse if there is a parallel thread taking care of this. Currently the internal state is updated if possible when trying to acquire the rate limiter, so Future appears only when scheduling updates of the internal state of the rate limiter when it's blocked.
Regardless of fairness, the operation is always executed in the same thread that called the rate limiter.
There was a problem hiding this comment.
But what kind of updates do you need to do when the RateLimiter is blocked? I don't want to keep any kind of internal queue, the queue of blocked virtual threads should be enough. I might be missing something, but implementing a rate limiter doesn't seem to require that.
We shouldn't use Futures. If a background process is required, this should be done by using Ox's forks & a structured concurrency scope. Using Futures causes that threads have an indefinite lifetime, defeating the whole purpose of the Ox project.
Fairness refers to the fact if threads are given a "fair" chance to complete, once blocked. But we want to delegate this to the VT scheduler as well.
There was a problem hiding this comment.
I see. The updates would be to update the number of permits held by the semaphore, not related to fairness or queues. This is possible also with some kind of "smart" polling. If we have 10 operations to run and we just acquire the underlying semaphore, they will just be blocked without unblocking because no new permits will be added. So either a parallel process checks next updating time to schedule for it and let the threads acquire randomly the semaphore (current implementation for unfair) or, instead of acquiring the semaphore, we just "try" acquiring it and if it's not possible to run we make all threads sleep until the next update and repeate this process until they are able to acquire successfully. We wouldn't be actually blocking through Semaphore.acquire. Some of this 10 operations might run after unlocking and the others would continue polling.
Maybe there is some other way but I don't see it at the moment. Are you referring to this?
Respect to the fair version, maybe there is some implementation following some of these lines without use of Futures. Would you like to also have a fair version in addition or just do away with it?
There was a problem hiding this comment.
So I think the RateLimiter will have to be created in a supervised scope and create a fork which manages the replenishing of the permits? Maybe through an actor?
Another axis, is whether we're taking into account the start time of the operation, or the completion time. So if you say "2 operations per second" does it mean that (A) two operations might start per second, or (B) two operations might run concurrently in every second? But I guess that's the responsibility of the various algorithms. But will need to be documented well.
As for fairness, let's go with the simples possible solution, that is compatible with strutured concurrency & the rest of the Ox project. So let's for now not implement a fair flag, instead delegating to the JVM scheduler.
There was a problem hiding this comment.
I've made some modifications and now there's only the unfair approach with updating of state inside a supervised scope. The updating process is called in the first call to the rate limiter that is not able to pass. In this way, there shouldn't be any overhead when the rate limiter is not blocked.
I'm not sure whether this is the right place to start the fork so I've explored the solution of calling a modified updating method from the constructor of RateLimiter or GenericRateLimiter which would be blocking until the executor signals the need of unblocking. This approach hanged indefinitely when eventually calling ox.sleep and I'm not sure if this is the expected behavior when calling the method from the body of the case class (always inside a fork).
Currently, the algorithm only checks the start time of the operation, but the other version should be possible to implement modifying the actual one.
There was a problem hiding this comment.
I'm not sure I understand how you got the deadlock, can you show it in code?
My idea was to start the whole RateLimiter inside a concurrency scope (requiring an Ox parameter), and there as part of the construction process start a fork which would handle the replenishing of permits
There was a problem hiding this comment.
I've tried reproducing again the deadlock and surprisingly it works now. I don't remember doing anything differently so I am not sure about what caused the issue... From doing some debugging it was caused by the Thread.sleep. Probably it was similar to what I've written now but had some small bug provoking that. I'll update the branch to the correct version now.
|
|
||
| def update: Unit = | ||
| val now = System.nanoTime() | ||
| lastUpdate.updateAndGet { time => |
There was a problem hiding this comment.
according to docs, updateAndGet should be side-effect free - here, we're manipulating the semaphores
There was a problem hiding this comment.
Yes. I think we should also move all the updating mechanism to GenericRateLimiter so that update doesn't need to be thread-safe and avoid some atomic references in the algorithm's implementations
|
|
||
| /** Blocks rejected operations until the rate limiter is ready to accept them or drops them depending on the choosen strategy. | ||
| */ | ||
| case class BlockOrDrop() extends Executor[Strategy.BlockOrDrop]: |
There was a problem hiding this comment.
this hybrid strategy looks weird ... maybe we should only pass the strategies as a parameter, instead of parametrizing the whole rate limiter with it?
There was a problem hiding this comment.
I'm not sure what are you thinking exactly. Something like this would be equivalent but would also accept "bad" strategies.
case class GenericRateLimiter {
def apply[T, Result[_], Returns[_[_]] <: Strategy[_]](
operation: => T
)(using Returns[Result]): Result[T]
}
There was a problem hiding this comment.
Bad in what sense? We determine the strategy at RateLimiter's call site, no?
There was a problem hiding this comment.
At the RateLimiter level there wouldn't be any problem but the point of having GenericRateLimiter would be to allow customizing through passing a Strategy possibly customized by a user. The user could use a strategy for which there is not a corresponding executor. Parametrizing the GRL would make any Strategy not extending the Returns type a compile error.
It should be possible to pass directly the executor, although depending on the use it might create problems, e.g., if the user creates a custom executor with internal state and doesn't reuse the same executor in different calls to the rate limiter or if different executors need some common internal state. It would make also more difficult to pass a parameter to customize executor behaviour if there is some internal state that needs to be shared.
There was a problem hiding this comment.
Ah I see the problem. But this BlockOrDrop is fishy anyway. In its update call, you only call blockExecutor.update. Shouldn't the executor be somehow shared? What if the user calls .runBlocking and .runOrDrop interchangably? Would be good to have a test which checks for such combinations. And this either needs simplification, or good docs why this is done this way
There was a problem hiding this comment.
Actually the BlockOrDrop executor at the moment just redirects to the appropriate executor depending on the strategy. I can expand the tests to make more coverage of the behaviour, but I am not really sure what kind of simplifications do you have in mind. For this particular executor, I don't see any need for shared state (after simplifying there will be no internal state in any case). The internal state to check whether an operation can be accepted is always in the RateLimiterAlgorithm while the executor should only be concerned with how this information is used.
This will disappear after simplifying the updating so the following is not important but might provide context. BlockOrDrop only called the block updater because the drop updater didn't do anything. A common method to update rate limiters is when they receive a call so there are no background threads involved. The problem is that this only works for the drop executor while the blocking one needs some kind of queueing mechanism and thus background updating. Although in the case of fair blocking it might introduce unfairness to the BlockOrDrop if there is not shared state.
| /** Limits the rate of execution of the given operation with a custom Result type | ||
| */ | ||
| def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] = | ||
| executor.schedule(algorithm, operation) |
There was a problem hiding this comment.
is the schedule/execute distinction needed? can't it be combined in a single method call?
There was a problem hiding this comment.
Technically yes but it seems to me better organized that way. If updating is now done by the GenericRateLimiter, I think we need to pass a semaphore to allow locking and unlocking of the updater so we would need both.
| val waitTime = lastUpdate.get() + per.toNanos - System.nanoTime() | ||
| val q = semaphore.getQueueLength() | ||
| if waitTime > 0 then waitTime | ||
| else if q > 0 then per.toNanos |
There was a problem hiding this comment.
It hanged before but I think it might be better to return here an Option[Long] so we can differentiate between
no updating None, updating only once Some(0L) and continue updating.
There was a problem hiding this comment.
but do we ever stop updating, if it's done in a background process?
There was a problem hiding this comment.
It could be possible to only schedule if needed, for example, if there are no calls in 10 minutes surpassing the rate and it updates each minute, we could update after 10 minutes when the rate is surpassed. If there is a thread anyway instead of starting one only when needed, I don't think we gain much. Probably better to just schedule always.
There was a problem hiding this comment.
yeah, I think that optimization wouldn't save much. Let's simplify an schedule always
|
I think the implementation could use some cleanup and review of correctness (esp when it comes to concurrency) after all the changes. It takes a lot of time to review, and there are still some significant issues |
| */ | ||
| case class Block() extends Executor[Strategy.Blocking]: | ||
|
|
||
| val updateLock = new Semaphore(0) |
There was a problem hiding this comment.
is the update lock needed? we're always starting update as a background proces in a fork, no? and updating only from that fork
There was a problem hiding this comment.
I don't think there is a way to avoid two semaphores: one is needed to block and unblock the updater so all performed updates are really needed. The other one in this case is to avoid race conditions when giving permits and avoiding giving more than 1.
Although if we just let the updater run in the background whether it's needed or not, it would simplify the code, also for downstream users implementing their own algorithm. What do you think?
There was a problem hiding this comment.
Ah ... I thought the updater is always run in the background. What's the scenario for not running it in the background?
There was a problem hiding this comment.
But this simplification sounds good, RateLimiter needs the Ox capability anyway
|
Thank you for your time reviewing this! I think that some of these changes might simplify the code. |
|
I've separated the updating mechanism from the executors, added details and examples in docs about the use of Strategies and Executors and how to customize algorithms and executors and checked the algorithm API. There is also a new test at the end of Finally, I've merged the token bucket and leaky bucket implementations into one as they were very similar and added the possibility of acquiring an arbitrary number of permits in the All this should be independent from the final aspect of the |
| def update: Unit = | ||
| val now = System.nanoTime() | ||
| // retrieving current queue to append it later if some elements were added concurrently | ||
| val q = log.getAndUpdate(_ => new LinkedList[(Long, Int)]()) |
There was a problem hiding this comment.
here the log becomes empty for some time, allowing operations to be started, even if that would exceed the rate limit?
There was a problem hiding this comment.
operations are started if the semaphore allows it, so the log is unrelated to that. Once the log is processed, permits will be restored and the semaphore will allow new operations depending on how many are being restored.
| // adds timestamp to log | ||
| val now = System.nanoTime() | ||
| log.updateAndGet { q => | ||
| q.add((now, permits)) |
There was a problem hiding this comment.
shouldn't we use an immutable data structure here, as updateAndGet can be called multiple times?
| def update: Unit = | ||
| val now = System.nanoTime() | ||
| lastRefillTime.set(now) | ||
| if semaphore.availablePermits() < rate then semaphore.release() |
There was a problem hiding this comment.
so the difference with FixedRate is that we always release 1 permit?
There was a problem hiding this comment.
That's it, and permits are accumulated if not used
| case class Drop() extends Executor[Strategy.Dropping]: | ||
| def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Dropping[Result[*]]): Result[T] = | ||
| if algorithm.tryAcquire then cfg.run(operation) | ||
| else None.asInstanceOf[Result[T]] |
There was a problem hiding this comment.
I'm not sure if we're not trying to be overly flexible here. Drop on one hand seems to work with any result type, but in practice requires an option (because of the case here). Maybe simply the executor should have a fixed return type (Block - identity, Drop - Option). Would we loose any flexibility then?
There was a problem hiding this comment.
I think that the main problem is then to integrate them easily with GenericRateLimiter. If we are going for fixed return type I would put all the logic inside RateLimiter because otherwise it's just wrapping of logic.
In the recent push I've deleted the GRL and Executor classes and passed the updating logic to RateLimiter. If any user wants to customize how the algorithm is manipulated, then the easiest way would be to create its own interface. I've also updated the docs and tests.
| q.dequeueOption match | ||
| case None => q | ||
| case Some((head, tail)) => | ||
| if semaphore.availablePermits() < rate && head._1 + per.toNanos < now then |
There was a problem hiding this comment.
is it at all possible that head._1+per.toNanos.now (the oldest entry should be release), but there's more permits available than rate? In other words, is the first part of this condition necessary?
There was a problem hiding this comment.
Actually no, thank you. I've just changed it!
|
I added some final polishing, and we are done - thanks! :) |
|
Great! Thank you for your time reviewing this, really appreciate it! |
Implements a customizable rate limiter. The behaviour depend on a
RateLimiterConfigbuilt from aBlockingPolicyand aRateLimiterAlgorithm.BlockingPolicyshould deal exclusively with the response to rejected operations whileRateLimiterAlgorithmmust control only whether an operation can be accepted or not.Currently, there are two blocking policies:
BlockandDrop.Block: If the algorithm gets blocked, new operations will be queued so that when the algorithm gets unblocked, these operations will be processed first.Drop: Operations passed to the rate limiter when the algorithm is blocked will be discardedThere are also 4 algorithm implementations: fixed rate, sliding window, leaky bucket and token bucket.
Both
BlockingPolicyandRateLimiterAlgorithmpresent an interface (which I hope is not confusing) that makes very easy to implement new behaviour. If the guidelines for implementation are followed, things like throttling operations or blocking a particular number and discarding thereafter should be very easy to build.Tests include behaviour for all the 8 different combinations and test the behaviour also in a concurrent context.
/claim #120
fixes #120