Skip to content

TimedScheduler leaks cancelled pending task samples #3697

@nathankooij

Description

@nathankooij

In #3642 an issue was reported & solved where pending tasks get stopped whenever the task's scheduling gets rejected. In addition to that we noticed that in one of our applications there exists another case where pending task samples can pile up, whenever the scheduled task gets cancelled before it runs. This is quite a common case for users of the timeout operator, which submits a delayed task representing the timeout. However, if the reactive chain completes before the timeout is hit, the delayed task is cancelled. This is not reflected in the pending sample being stopped, and thus frequent usages of the timeout operator on an instrumented scheduler can easily pile up.

Expected Behavior

Pending task samples for the TimedScheduler should get stopped when the task is cancelled/disposed.

Actual Behavior

The pending task sample is never stopped, and thus a memory leak is created.

Steps to Reproduce

	@Test
	void pendingTaskRemovedOnCancellation() {
		ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
		Scheduler original = Schedulers.fromExecutorService(executorService);
		MeterRegistry registry = new SimpleMeterRegistry();
		TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty());
		testScheduler.init();

		RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending");
		LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();

		try {
			// Schedule a task far in the future.
			Disposable.Swap waitingTask = Disposables.swap();
			assertThatNoException().isThrownBy(() -> waitingTask.update(testScheduler.schedule(() -> {}, 10_000, TimeUnit.SECONDS)));

			// It's pending to be scheduled.
			assertThat(longTaskTimer.activeTasks()).as("active pending")
					.isOne();

			// E.g. a `Mono#timeout` was never hit, so the task gets disposed.
			waitingTask.dispose();

			// The task should no longer be considered pending, as it was disposed.
			assertThat(longTaskTimer.activeTasks()).as("active pending")
					.isZero();
		} finally {
			testScheduler.disposeGracefully().block(Duration.ofSeconds(1));
		}
	}

The final assertion fails with the current implementation.

Possible Solution

Wrapping the Disposable returned from the scheduling and having that also stop the sample could work, something like (kept short here, should be applied to all code paths):

diff --git a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
index 83b261ef4..f588b7e23 100644
--- a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
+++ b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java
@@ -105,7 +105,7 @@ final class TimedScheduler implements Scheduler {
 		TimedRunnable timedTask = wrap(task);
 
 		try {
-			return delegate.schedule(timedTask, delay, unit);
+			return new TimedDisposable(delegate.schedule(timedTask, delay, unit), timedTask.pendingSample::stop);
 		}
 		catch (RejectedExecutionException exception) {
 			timedTask.pendingSample.stop();
@@ -199,6 +199,22 @@ final class TimedScheduler implements Scheduler {
 		}
 	}
 
+	static final class TimedDisposable implements Disposable {
+		final Disposable disposable;
+		final Runnable stopSample;
+
+		public TimedDisposable(Disposable disposable, Runnable stopSample) {
+			this.disposable = disposable;
+			this.stopSample = stopSample;
+		}
+
+		@Override
+		public void dispose() {
+			disposable.dispose();
+			stopSample.run();
+		}
+	}
+
 	static final class TimedRunnable implements Runnable {
 
 		final MeterRegistry registry;

Happy to file a PR if we agree on this solution.

Your Environment

  • Reactor version(s) used: 3.6.2
  • JVM version (java -version): OpenJDK 64-Bit Server VM GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08, mixed mode, sharing)
  • OS and version (eg uname -a): Darwin MacBook-Pro-9.local 22.6.0 Darwin Kernel Version 22.6.0: Wed Jul 5 22:21:56 PDT 2023; root:xnu-8796.141.3~6/RELEASE_X86_64 x86_64

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions