-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Console logger factory concurrency and potential memory leak #3170
Copy link
Copy link
Closed
Labels
type/enhancementA general enhancementA general enhancement
Milestone
Description
I"ve encountered the following error while calling Flux.log after setting logging to console logger :
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1221)
at reactor.util.Loggers$ConsoleLoggerFactory.apply(Loggers.java:634)
at reactor.util.Loggers$ConsoleLoggerFactory.apply(Loggers.java:622)
at reactor.util.Loggers.getLogger(Loggers.java:182)
at reactor.core.publisher.SignalLogger.<init>(SignalLogger.java:119)
at reactor.core.publisher.SignalLogger.<init>(SignalLogger.java:75)
at reactor.core.publisher.Flux.log(Flux.java:6194)
at reactor.core.publisher.Flux.log(Flux.java:6164)
at reactor.core.publisher.Flux.log(Flux.java:6121)
By inspecting console logger factory, I've seen that no synchronization happens, and also that loggers are stored in an internal static HashMap, that is never cleaned. I think the following improvements should be done :
- if ConsoleLoggerFactory returns a new logger instance each time, it would make any memory leak/concurreny problem disappear. Console loggers do not look heavy-weight, that might be the best solution.
- If caching loggers is really desired, then, we must :
- manage concurrency in console factory by replacing the hashmap with a ConcurrentHashMap, or at least synchronize HashMap calls
- Adress potential memory leak problems. Not sure how to do it properly here. As Java does not provide any WeakValueHashMap, I do not have a solution in mind yet.
- Side task : improve
useCustomLoggersmethod javadoc, by specifying that it is user responsability to ensure proper state synchronization management in case provided function is not "pure".
if a library maintainer agree with the analysis above, I will be glad to provide fixes as a PR.
Note: for reference, the snippet that produced the error is embedded below:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Loggers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Reproduce https://stackoverflow.com/questions/73467845/how-can-this-project-reactor-behavior-explained
*/
public class ReactorThreadIsolation {
public static void main(String[] args) throws InterruptedException {
Loggers.useConsoleLoggers();
launch().await(5, TimeUnit.MINUTES);
}
private static CountDownLatch launch() {
Consumer<String> slowConsumer = x -> {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch(Exception ignore) {
System.err.println("INTERRUPTED !");
}
};
Flux<String> publisher = Flux
.just(1, 2, 3, 4)
.parallel(2)
.runOn(Schedulers.newParallel("writer", 2, true))
.flatMap(rail -> Flux.range(1, 300)
.map(i -> {
String msg = String.format("Rail %d -> Row %d", rail, i);
System.out.println("[PRINT] ("+Thread.currentThread()+") "+msg);
return msg;
})
.log())
.sequential()
.publishOn(Schedulers.newSingle("reader", true))
.doOnNext(slowConsumer);
final CountDownLatch barrier = new CountDownLatch(1);
publisher.subscribe(
value -> {},
err -> { err.printStackTrace() ; barrier.countDown(); },
() -> barrier.countDown()
);
return barrier;
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/enhancementA general enhancementA general enhancement