Skip to content

Unstopped samples in TimedScheduler #3844

@rethab

Description

@rethab

Decorating the reactor schedulers with TimedScheduler leads to very high CPU in some of our production services.
I'm not entirely sure what the problem is, but I noticed that sometimes, samples in the TimedScheduler are not cleaned up.

Expected Behavior

When a sample is started for the metrics, I expect it to be stopped again.

Actual Behavior

Sometimes, this does not seem to happen.

In particular, I'm looking at the TimedScheduler's pendingTasks timer.

When a TimedRunnable is instantiated, it starts the sample: this.pendingSample = parent.pendingTasks.start();
In both the run method and the dispose method, it stops it again.

Internally, when calling stop on SampleImpl it removes the sample from the list of active tasks.

I noticed in my test run, the pending tasks is sometimes not empty after having run a bunch of Monos.

In other words, this fails:

//
// run some Mono...bla..subscribe  (see below)
//
Field field = scheduler.getClass().getDeclaredField("pendingTasks");
field.setAccessible(true);
DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
assertEquals(0, pendingTasks.activeTasks());

Additionally, I have also used my own implementation of TimedScheduler and overridden the finalize method of TimedRunnable. In there, I printed something if the pendingSample was not stopped, but the runnable was garbage collected:

@Override
protected void finalize() throws Throwable {
		super.finalize();
		if (this.pendingSample != null) {
			var duration = this.pendingSample.duration(TimeUnit.MICROSECONDS);
			if (duration != -1) {
				System.out.println("Pending task was not stopped");
			}
		}
}

(I'm not sure if this is a reliable way to detect this though)

Steps to Reproduce

package org.example;

import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class Main {
    public static void main(String[] args) throws Exception {
        var scheduler = Micrometer.timedScheduler(
                Schedulers.newBoundedElastic(10, 10000, "boundedElastic"),
                new SimpleMeterRegistry(),
                "test");

        var totalCalls = new AtomicLong();
        var iterations = 50_000;
        new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                Mono.delay(Duration.ofMillis(5))
                        .subscribeOn(scheduler)
                        .timeout(Duration.ofMillis(20), scheduler)
                        .subscribe(__ -> totalCalls.incrementAndGet(), __ -> totalCalls.incrementAndGet());
            }
        }).start();

        while (totalCalls.get() < iterations) {
            Thread.sleep(1000);
            System.out.printf("Progress: %.1f\n", 100d / iterations * totalCalls.get());
        }

        scheduler.dispose();

        Field field = scheduler.getClass().getDeclaredField("pendingTasks");
        field.setAccessible(true);
        DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
        System.out.println("Pending Tasks: " + pendingTasks.activeTasks());
        assertEquals(0, pendingTasks.activeTasks());
    }

}

Possible Solution

???

Your Environment

  • io.projectreactor:reactor-core:3.6.8
  • io.projectreactor:reactor-core-micrometer:1.1.8
  • io.micrometer:micrometer-core:1.13.2
  • M1 Macbook Pro
  • Java 17

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions