Skip to content

Flux hangs when GroupedFlux is used together with ParallelFlux #1959

@kcieniuch-r7

Description

@kcieniuch-r7

Problem description:

Flux hangs when GroupedFlux is used together with ParallelFlux. i.e. when we use parallel to process groups created by groupBy. See code below for exact scenario.

Expected Behavior

ParallelFlux can be used together with GroupedFlux to processes data and finish without errors.

Actual Behavior

Flux hangs and stops processing data.

Steps to Reproduce

Looks like there is unexpected interaction when GroupedFlux and ParallelFlux are used together.
(NB parallelism of nested ParallelFlux is irrelevant the issue happens with any parallelism)

Note: if we "hide" GroupedFlux identity then Flux doesn't hang.

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class GroupByIssue {

    public static void main(String[] args) throws InterruptedException {
        int parallelism = 2; // issue can be reproduced with any parallelism

        Scheduler process = Schedulers.newParallel("process", parallelism, true);

        final long start = System.nanoTime();

        Flux.range(0, 4_000_000)
            .subscribeOn(Schedulers.newSingle("range", true))
            .groupBy(i -> i % 2)
            .flatMap(g ->
                g.key() == 0
                    ? g //.hide()  /* adding hide here fixes the hang issue */
                        .parallel(parallelism)
                        .runOn(process)
                        .map(i -> i)
                        .sequential()
                    : g.map(i -> i) // no need to use hide
            )
            .doOnNext(i -> print(i))
            .then()
            .block();

        System.out.printf("elapsed: %d\n", (System.nanoTime() - start) / 1_000_000);
    }

    private static void print(int current) {
        if (current % 100000 == 0) {
            System.out.println("processed: " + current);
        }
    }
}

Environment

  • Reactor version used
    • reactor-core 3.3.0.RELEASE Dysprosium-SR1
  • JVM version
    • openjdk version "11.0.4" 2019-07-16
    • OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.4+11)
    • OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.4+11, mixed mode)
  • OS and version
    • macOS Mojave 10.14.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions