In #3642 an issue was reported & solved where pending tasks get stopped whenever the task's scheduling gets rejected. In addition to that we noticed that in one of our applications there exists another case where pending task samples can pile up, whenever the scheduled task gets cancelled before it runs. This is quite a common case for users of the timeout operator, which submits a delayed task representing the timeout. However, if the reactive chain completes before the timeout is hit, the delayed task is cancelled. This is not reflected in the pending sample being stopped, and thus frequent usages of the timeout operator on an instrumented scheduler can easily pile up.
Expected Behavior
Pending task samples for the TimedScheduler should get stopped when the task is cancelled/disposed.
Actual Behavior
The pending task sample is never stopped, and thus a memory leak is created.
Steps to Reproduce
@Test
void pendingTaskRemovedOnCancellation() {
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Scheduler original = Schedulers.fromExecutorService(executorService);
MeterRegistry registry = new SimpleMeterRegistry();
TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty());
testScheduler.init();
RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending");
LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();
try {
// Schedule a task far in the future.
Disposable.Swap waitingTask = Disposables.swap();
assertThatNoException().isThrownBy(() -> waitingTask.update(testScheduler.schedule(() -> {}, 10_000, TimeUnit.SECONDS)));
// It's pending to be scheduled.
assertThat(longTaskTimer.activeTasks()).as("active pending")
.isOne();
// E.g. a `Mono#timeout` was never hit, so the task gets disposed.
waitingTask.dispose();
// The task should no longer be considered pending, as it was disposed.
assertThat(longTaskTimer.activeTasks()).as("active pending")
.isZero();
} finally {
testScheduler.disposeGracefully().block(Duration.ofSeconds(1));
}
}
The final assertion fails with the current implementation.
Possible Solution
Wrapping the Disposable returned from the scheduling and having that also stop the sample could work, something like (kept short here, should be applied to all code paths):
diff --git a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
index 83b261ef4..f588b7e23 100644
--- a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
+++ b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
@@ -105,7 +105,7 @@ final class TimedScheduler implements Scheduler {
TimedRunnable timedTask = wrap(task);
try {
- return delegate.schedule(timedTask, delay, unit);
+ return new TimedDisposable(delegate.schedule(timedTask, delay, unit), timedTask.pendingSample::stop);
}
catch (RejectedExecutionException exception) {
timedTask.pendingSample.stop();
@@ -199,6 +199,22 @@ final class TimedScheduler implements Scheduler {
}
}
+ static final class TimedDisposable implements Disposable {
+ final Disposable disposable;
+ final Runnable stopSample;
+
+ public TimedDisposable(Disposable disposable, Runnable stopSample) {
+ this.disposable = disposable;
+ this.stopSample = stopSample;
+ }
+
+ @Override
+ public void dispose() {
+ disposable.dispose();
+ stopSample.run();
+ }
+ }
+
static final class TimedRunnable implements Runnable {
final MeterRegistry registry;
Happy to file a PR if we agree on this solution.
Your Environment
- Reactor version(s) used: 3.6.2
- JVM version (
java -version): OpenJDK 64-Bit Server VM GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08, mixed mode, sharing)
- OS and version (eg
uname -a): Darwin MacBook-Pro-9.local 22.6.0 Darwin Kernel Version 22.6.0: Wed Jul 5 22:21:56 PDT 2023; root:xnu-8796.141.3~6/RELEASE_X86_64 x86_64
In #3642 an issue was reported & solved where pending tasks get stopped whenever the task's scheduling gets rejected. In addition to that we noticed that in one of our applications there exists another case where pending task samples can pile up, whenever the scheduled task gets cancelled before it runs. This is quite a common case for users of the
timeoutoperator, which submits a delayed task representing the timeout. However, if the reactive chain completes before the timeout is hit, the delayed task is cancelled. This is not reflected in the pending sample being stopped, and thus frequent usages of thetimeoutoperator on an instrumented scheduler can easily pile up.Expected Behavior
Pending task samples for the
TimedSchedulershould get stopped when the task is cancelled/disposed.Actual Behavior
The pending task sample is never stopped, and thus a memory leak is created.
Steps to Reproduce
The final assertion fails with the current implementation.
Possible Solution
Wrapping the
Disposablereturned from the scheduling and having that also stop the sample could work, something like (kept short here, should be applied to all code paths):Happy to file a PR if we agree on this solution.
Your Environment
java -version):OpenJDK 64-Bit Server VM GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08, mixed mode, sharing)uname -a):Darwin MacBook-Pro-9.local 22.6.0 Darwin Kernel Version 22.6.0: Wed Jul 5 22:21:56 PDT 2023; root:xnu-8796.141.3~6/RELEASE_X86_64 x86_64