-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Milestone
Description
RxJava 2.2.3
Following code reproduces the problem:
public class TestObs {
static public void main(String[] args) {
AtomicInteger count = new AtomicInteger();
Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
Observable.range(0, 400)
.flatMap(i -> Observable.just(i).subscribeOn(scheduler).map(x -> {
System.out.println("Start " + Thread.currentThread().getName()
+ " " + x + " concurrent " + count.incrementAndGet());
Thread.sleep(1000);
System.out.println("End " + Thread.currentThread().getName()
+ " " + x + " concurrent " + count.decrementAndGet());
return x;
}),
30
).ignoreElements().blockingGet();
}
}At the end you will see that only 1 child will be executed concurrently. At the beginning it is 30.
It is about flatMap operator with concurrency argument. In the case if you use concurrency Integer.MAX_VALUE with fixed pool is everything ok.
The problem happens in the case if more than one inner process are done in the same drainLoop.
I think it is here (io.reactivex.internal.operators.observable.ObservableFlatMap 448)
SimpleQueue<U> innerQueue = is.queue;
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted = true;
}More then one will be removed, but innerCompleted pulls only one as replacement (io.reactivex.internal.operators.observable.ObservableFlatMap 468)