Skip to content

Console logger factory concurrency and potential memory leak #3170

@alexismanin

Description

@alexismanin

I"ve encountered the following error while calling Flux.log after setting logging to console logger :

java.util.ConcurrentModificationException
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1221)
	at reactor.util.Loggers$ConsoleLoggerFactory.apply(Loggers.java:634)
	at reactor.util.Loggers$ConsoleLoggerFactory.apply(Loggers.java:622)
	at reactor.util.Loggers.getLogger(Loggers.java:182)
	at reactor.core.publisher.SignalLogger.<init>(SignalLogger.java:119)
	at reactor.core.publisher.SignalLogger.<init>(SignalLogger.java:75)
	at reactor.core.publisher.Flux.log(Flux.java:6194)
	at reactor.core.publisher.Flux.log(Flux.java:6164)
	at reactor.core.publisher.Flux.log(Flux.java:6121)

By inspecting console logger factory, I've seen that no synchronization happens, and also that loggers are stored in an internal static HashMap, that is never cleaned. I think the following improvements should be done :

  • if ConsoleLoggerFactory returns a new logger instance each time, it would make any memory leak/concurreny problem disappear. Console loggers do not look heavy-weight, that might be the best solution.
  • If caching loggers is really desired, then, we must :
    • manage concurrency in console factory by replacing the hashmap with a ConcurrentHashMap, or at least synchronize HashMap calls
    • Adress potential memory leak problems. Not sure how to do it properly here. As Java does not provide any WeakValueHashMap, I do not have a solution in mind yet.
  • Side task : improve useCustomLoggers method javadoc, by specifying that it is user responsability to ensure proper state synchronization management in case provided function is not "pure".

if a library maintainer agree with the analysis above, I will be glad to provide fixes as a PR.

Note: for reference, the snippet that produced the error is embedded below:

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Loggers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * Reproduce https://stackoverflow.com/questions/73467845/how-can-this-project-reactor-behavior-explained
 */
public class ReactorThreadIsolation {

    public static void main(String[] args) throws InterruptedException {
        Loggers.useConsoleLoggers();
        launch().await(5, TimeUnit.MINUTES);
    }

    private static CountDownLatch launch() {
        Consumer<String> slowConsumer = x -> {
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch(Exception ignore) {
                System.err.println("INTERRUPTED !");
            }
        };
        Flux<String> publisher = Flux
                .just(1, 2, 3, 4)
                .parallel(2)
                .runOn(Schedulers.newParallel("writer", 2, true))
                .flatMap(rail -> Flux.range(1, 300)
                        .map(i -> {
                            String msg = String.format("Rail %d -> Row %d", rail, i);
                            System.out.println("[PRINT] ("+Thread.currentThread()+") "+msg);
                            return msg;
                        })
                        .log())
                .sequential()
                .publishOn(Schedulers.newSingle("reader", true))
                .doOnNext(slowConsumer);

        final CountDownLatch barrier = new CountDownLatch(1);
        publisher.subscribe(
                value -> {},
                err -> { err.printStackTrace() ; barrier.countDown(); },
                () -> barrier.countDown()
        );
        return barrier;
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions