core: SynchronizationContext exposed by LoadBalancer.Helper#4971
core: SynchronizationContext exposed by LoadBalancer.Helper#4971zhangkun83 merged 9 commits intogrpc:masterfrom
Conversation
| * submitted. | ||
| */ | ||
| public final ScheduledContext scheduleNow(Runnable task) { | ||
| return schedule(task, 0, TimeUnit.NANOSECONDS); |
There was a problem hiding this comment.
I'm not excited about making an easy way to make a zero-delay task. This just abuses the scheduled executor and is a strong code smell to me.
There was a problem hiding this comment.
I can instead abstract it and make schedule() call scheduleNow() when delay <= 0. Is it better?
There was a problem hiding this comment.
No. It would be surprising for a task to suddenly run in the current thread when the delay is 0. They are fundamentally different.
There was a problem hiding this comment.
It's the same as the current runSerialized(). I still don't understand the issue.
There was a problem hiding this comment.
I'm not sure what you're saying is the same. Today runSerialized() runs on the current thread:
https://github.com/grpc/grpc-java/blob/v1.15.0/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L1236-L1238
And any schedule() would run on a separate thread. I'm against having schedule() turn into running on the current thread based on the timeout.
There was a problem hiding this comment.
Fair enough. I have decoupled schedule() with scheduleNow().
| /** | ||
| * Returns the current time in nanos from the same clock that {@link #schedule} uses. | ||
| */ | ||
| public abstract long currentTimeNanos(); |
There was a problem hiding this comment.
This is a weird API to expose, since it will not agree with either currentTimeMillis nor nanoTime. Based on the documentation it would appear to be similar to nanoTime(), but the actual implementation uses a epoch of 1970 like currentTimeMillis, except if currentTimeMillis and nanoTime get out-of-sync. It seems this should just be nanoTime().
(I don't care really if it has a different offset than nanoTime(), but aligning it to 1970 seems like a bad idea since it can't be guaranteed to align with 1970.)
| /** | ||
| * Schedules a task to run as soon as poassible. | ||
| * | ||
| * <p>Non-reentrency is guaranteed. Although task may run inline, but if this method is called |
There was a problem hiding this comment.
s/Although task may run inline, but if/If/ . That seems more clear.
| public abstract ScheduledContext scheduleNow(Runnable task); | ||
|
|
||
| /** | ||
| * Schedules a task to be run after a delay. Unlike {@link #scheduleNow}, the task will typically |
There was a problem hiding this comment.
The "typically" is hard to reason about. Could we just say, "Unlike {@link #scheduleNow}, will never be run inline."? (Or maybe even, just "Will never be run inline.")
And make it a semi-concrete class. And it absorbs ChannelExecutor. TODO: unit test on new methods on SynchronizationContext.
| private final PriorityBlockingQueue<ScheduledTask> tasks = | ||
| new PriorityBlockingQueue<ScheduledTask>(); | ||
| // Must keep the ordering of tasks as they are required by ControlPlaneScheduler.scheduleNow(). | ||
| private final LinkedBlockingQueue<ScheduledTask> tasks = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Can we simply use two queues instead? One for pending (ready to be executed) tasks and one for scheduled (for a future time) tasks? We'd keep the previous PriorityBlockingQueue and then just add a LinkedBlockingQueue for execute(). That more closely matches what would happen in practice and makes the code more clear.
| * | ||
| * <p>The default implementation logs a warning. | ||
| */ | ||
| protected void handleUncaughtThrowable(Throwable t) { |
There was a problem hiding this comment.
Nit: the class could be made final and be passed a Thread.UncaughtExceptionHandler (with a note that the thread will not die after executing the handler, which is different from its documentation).
| /** | ||
| * Enqueues a task that will be run when {@link #drain} is called. | ||
| */ | ||
| public final void executeLater(Runnable runnable) { |
There was a problem hiding this comment.
It might be good to point out this is useful for adding things from within a lock and then calling drain outside the lock.
zhangkun83
left a comment
There was a problem hiding this comment.
Thanks @ejona86. All comments are addressed.
| private final PriorityBlockingQueue<ScheduledTask> tasks = | ||
| new PriorityBlockingQueue<ScheduledTask>(); | ||
| // Must keep the ordering of tasks as they are required by ControlPlaneScheduler.scheduleNow(). | ||
| private final LinkedBlockingQueue<ScheduledTask> tasks = new LinkedBlockingQueue<>(); |
| /** | ||
| * Enqueues a task that will be run when {@link #drain} is called. | ||
| */ | ||
| public final void executeLater(Runnable runnable) { |
| * | ||
| * <p>The default implementation logs a warning. | ||
| */ | ||
| protected void handleUncaughtThrowable(Throwable t) { |
| * what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is | ||
| * not terminated when the handler is called. | ||
| */ | ||
| public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) { |
There was a problem hiding this comment.
We could provide a zero-arg version that just logs by default. But we can do that at any time. This seems fine for now.
Provides a
SynchronizationContextfor scheduling tasks, with and without delay, from LoadBalancer implementations. This absorbs and extends the internal utilityChannelExecutor. It supersedesHelper.runSerialized(), which is now deprecated.Motivation
I see multiple cases that schedule tasks with a delay while requiring the task to run in the "Channel Executor". There have been repeated work to wrap scheduled tasks and handle races between cancellation and task run (see the diff in
GrpclbState.javafor example). The LoadBalancer implementation (e.g., GrpclbLoadBalancer) also has to acquire theScheduledExecutorServicefrom somewhere and release it upon shutdown.The upcoming HealthCheckLoadBalancer (#4932), which would use back-off policy to retry health-checking streams, would have to do all the things above. At this point I think we need to provide something that combines
runSerialized()with a scheduled executor with the same synchronization guarantees.Design details
SynchronizationContextis a similar toScheduledExecutorServicebut tailored for use inLoadBalancerand potentially other cases outside ofLoadBalancer. It offers task queuing and serialization and delayed scheduling. It guarantees non-reentrancy and happens-before among tasks. It owns no thread, but run tasks on caller's or caller-provided threads.All channel-level state mutations and callback methods on
LoadBalancerare done in a SynchronizationContext, which was previously referred to as "Channel Executor".SynchronizationContext.schedule()returns aScheduledHandlefor status checking and cancellation.ScheduedFuturefromSchedulingExecutorService.schedule()is too broad for our use cases (e.g., the blockingget()should never be used).SynchronizationContext.schedule()requires aScheduledExecutorService, which is now available throughHelper.getScheduledExecutorService(). LoadBalancers don't need to worry about where to getSchedulingExecutorServiceany more.Alternatives
Alternatively, we could keep
Helper.runSerialized()and add something likeHelper.runSerialiezdWithDelay(), but having them on their own interface allows clean fake implementation byFakeClockfor test, and allows other components (potentiallyInternalSubchannelfor reconnection backoff) to use it too.Instead of asking caller of
schedule()to provide theScheduledExecutorService, we considered having SynchronizationContext take aScheduledExecutorServiceat construction. It would be inconvenient for LoadBalancer implementations that don't useschedule(), as they would be forced to provide a fakeScheduledExecutorService(which is cumbersome).Instead of making
SynchronizationContexta (semi-)concrete class, we considered making it an pure abstract class. However, we found it nontrivial to implementexecute()correctly with the non-reentrancy guarantee.