Decorating the reactor schedulers with TimedScheduler leads to very high CPU in some of our production services.
I'm not entirely sure what the problem is, but I noticed that sometimes, samples in the TimedScheduler are not cleaned up.
Expected Behavior
When a sample is started for the metrics, I expect it to be stopped again.
Actual Behavior
Sometimes, this does not seem to happen.
In particular, I'm looking at the TimedScheduler's pendingTasks timer.
When a TimedRunnable is instantiated, it starts the sample: this.pendingSample = parent.pendingTasks.start();
In both the run method and the dispose method, it stops it again.
Internally, when calling stop on SampleImpl it removes the sample from the list of active tasks.
I noticed in my test run, the pending tasks is sometimes not empty after having run a bunch of Monos.
In other words, this fails:
//
// run some Mono...bla..subscribe (see below)
//
Field field = scheduler.getClass().getDeclaredField("pendingTasks");
field.setAccessible(true);
DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
assertEquals(0, pendingTasks.activeTasks());
Additionally, I have also used my own implementation of TimedScheduler and overridden the finalize method of TimedRunnable. In there, I printed something if the pendingSample was not stopped, but the runnable was garbage collected:
@Override
protected void finalize() throws Throwable {
super.finalize();
if (this.pendingSample != null) {
var duration = this.pendingSample.duration(TimeUnit.MICROSECONDS);
if (duration != -1) {
System.out.println("Pending task was not stopped");
}
}
}
(I'm not sure if this is a reliable way to detect this though)
Steps to Reproduce
package org.example;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class Main {
public static void main(String[] args) throws Exception {
var scheduler = Micrometer.timedScheduler(
Schedulers.newBoundedElastic(10, 10000, "boundedElastic"),
new SimpleMeterRegistry(),
"test");
var totalCalls = new AtomicLong();
var iterations = 50_000;
new Thread(() -> {
for (int i = 0; i < iterations; i++) {
Mono.delay(Duration.ofMillis(5))
.subscribeOn(scheduler)
.timeout(Duration.ofMillis(20), scheduler)
.subscribe(__ -> totalCalls.incrementAndGet(), __ -> totalCalls.incrementAndGet());
}
}).start();
while (totalCalls.get() < iterations) {
Thread.sleep(1000);
System.out.printf("Progress: %.1f\n", 100d / iterations * totalCalls.get());
}
scheduler.dispose();
Field field = scheduler.getClass().getDeclaredField("pendingTasks");
field.setAccessible(true);
DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
System.out.println("Pending Tasks: " + pendingTasks.activeTasks());
assertEquals(0, pendingTasks.activeTasks());
}
}
Possible Solution
???
Your Environment
- io.projectreactor:reactor-core:3.6.8
- io.projectreactor:reactor-core-micrometer:1.1.8
- io.micrometer:micrometer-core:1.13.2
- M1 Macbook Pro
- Java 17
Decorating the reactor schedulers with TimedScheduler leads to very high CPU in some of our production services.
I'm not entirely sure what the problem is, but I noticed that sometimes, samples in the TimedScheduler are not cleaned up.
Expected Behavior
When a sample is started for the metrics, I expect it to be stopped again.
Actual Behavior
Sometimes, this does not seem to happen.
In particular, I'm looking at the TimedScheduler's
pendingTaskstimer.When a
TimedRunnableis instantiated, it starts the sample:this.pendingSample = parent.pendingTasks.start();In both the
runmethod and thedisposemethod, it stops it again.Internally, when calling
stopon SampleImpl it removes the sample from the list of active tasks.I noticed in my test run, the pending tasks is sometimes not empty after having run a bunch of Monos.
In other words, this fails:
Additionally, I have also used my own implementation of TimedScheduler and overridden the
finalizemethod ofTimedRunnable. In there, I printed something if thependingSamplewas not stopped, but the runnable was garbage collected:(I'm not sure if this is a reliable way to detect this though)
Steps to Reproduce
Possible Solution
???
Your Environment