Skip to content

flatMap concurrency decrease in case of simultaneous finishing in the child observables #6282

@yklymenko

Description

@yklymenko

RxJava 2.2.3

Following code reproduces the problem:

public class TestObs {

    static public void main(String[] args) {

        AtomicInteger count = new AtomicInteger();

        Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
        Observable.range(0, 400)
                .flatMap(i -> Observable.just(i).subscribeOn(scheduler).map(x -> {
                     System.out.println("Start " + Thread.currentThread().getName()
                        + " " + x + " concurrent " + count.incrementAndGet());
                     Thread.sleep(1000);
                     System.out.println("End " + Thread.currentThread().getName()
                        + " " + x + " concurrent " + count.decrementAndGet());

                     return x;
                  }),
                  30
                ).ignoreElements().blockingGet();
    }
}

At the end you will see that only 1 child will be executed concurrently. At the beginning it is 30.

It is about flatMap operator with concurrency argument. In the case if you use concurrency Integer.MAX_VALUE with fixed pool is everything ok.
The problem happens in the case if more than one inner process are done in the same drainLoop.
I think it is here (io.reactivex.internal.operators.observable.ObservableFlatMap 448)

                        SimpleQueue<U> innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            if (checkTerminate()) {
                                return;
                            }
                            innerCompleted = true;
                        }

More then one will be removed, but innerCompleted pulls only one as replacement (io.reactivex.internal.operators.observable.ObservableFlatMap 468)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions