-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed
Milestone
Description
Problem description:
Flux hangs when GroupedFlux is used together with ParallelFlux. i.e. when we use parallel to process groups created by groupBy. See code below for exact scenario.
Expected Behavior
ParallelFlux can be used together with GroupedFlux to processes data and finish without errors.
Actual Behavior
Flux hangs and stops processing data.
Steps to Reproduce
Looks like there is unexpected interaction when GroupedFlux and ParallelFlux are used together.
(NB parallelism of nested ParallelFlux is irrelevant the issue happens with any parallelism)
Note: if we "hide" GroupedFlux identity then Flux doesn't hang.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class GroupByIssue {
public static void main(String[] args) throws InterruptedException {
int parallelism = 2; // issue can be reproduced with any parallelism
Scheduler process = Schedulers.newParallel("process", parallelism, true);
final long start = System.nanoTime();
Flux.range(0, 4_000_000)
.subscribeOn(Schedulers.newSingle("range", true))
.groupBy(i -> i % 2)
.flatMap(g ->
g.key() == 0
? g //.hide() /* adding hide here fixes the hang issue */
.parallel(parallelism)
.runOn(process)
.map(i -> i)
.sequential()
: g.map(i -> i) // no need to use hide
)
.doOnNext(i -> print(i))
.then()
.block();
System.out.printf("elapsed: %d\n", (System.nanoTime() - start) / 1_000_000);
}
private static void print(int current) {
if (current % 100000 == 0) {
System.out.println("processed: " + current);
}
}
}Environment
- Reactor version used
- reactor-core 3.3.0.RELEASE Dysprosium-SR1
- JVM version
- openjdk version "11.0.4" 2019-07-16
- OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.4+11)
- OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.4+11, mixed mode)
- OS and version
- macOS Mojave 10.14.6
Metadata
Metadata
Assignees
Labels
No labels