It seems that Sinks.many().multicast() keeps subscribers even after cancelling subscription if subscription.dispose() executes concurrently.
We had this memory leak issue #3001 so we upgraded to 3.4.17 and it went much better. But there is still a memory leak in another emitter, SinkManySerialized. Here is a test which reproduces the issue:
@ParameterizedTest
@ValueSource(ints = {1,2,3,4,5})
public void testConcurrentUnsubscribe(int threadCount) {
int subsCount = 10;
Many<Object> emitter = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
List<Disposable> subscriptions = IntStream.range(0, subsCount)
.mapToObj(i -> emitter.asFlux().subscribe())
.toList();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<? extends Future<?>> disposeFutures = subscriptions.stream()
.map(subscription -> executor.submit(subscription::dispose))
.toList();
List<?> disposals = disposeFutures.stream().map(this::getQuietly).toList();
Assertions.assertEquals(subsCount, disposals.size());
Assertions.assertEquals(0, emitter.currentSubscriberCount());
}
private <T> T getQuietly(Future<T> it) {
try {
return it.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
The test passes with a single thread executor and it is more likely to fail with 2-5 threads. But it is expected that there is no subscribers even if subscription.dispose() executes concurrently.
Workaround
As a temporary solution we had to do all the subscription.dispose() calls in a single thread executor.
Environment
- Reactor version: 3.4.17
- Spring-webflux, netty
- JVM corretto 17
It seems that
Sinks.many().multicast()keeps subscribers even after cancelling subscription ifsubscription.dispose()executes concurrently.We had this memory leak issue #3001 so we upgraded to 3.4.17 and it went much better. But there is still a memory leak in another emitter, SinkManySerialized. Here is a test which reproduces the issue:
The test passes with a single thread executor and it is more likely to fail with 2-5 threads. But it is expected that there is no subscribers even if
subscription.dispose()executes concurrently.Workaround
As a temporary solution we had to do all the
subscription.dispose()calls in a single thread executor.Environment