-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Labels
Description
- RxJava 2.2.5
It seems that Completable passed to Completable.andThen(...) is not run on scheduler passed to observeOn. So
Completable.fromAction(() -> System.out.println("subscribing in thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.io())
.andThen(Completable.fromAction(() -> System.out.println("and then in thread " + Thread.currentThread().getName())))
.subscribe(() -> System.out.println("observing in thread " + Thread.currentThread().getName()));
might produce
subscribing in thread main
and then in thread main
observing in thread main
I don't know if it's by design or not, but it seems to be caused by 'race condition' in CompletableConcatArray where we have do {...} while() loop, when next() method is run second time from io() thread and bumps up the atomic counter, and then second iteration of do/while loop starts, and executes completable from andThen operator from main thread. It can be easily reproduced if you add some work to that loop before checking condition in while clause so in CompletableConcatArray:
a[idx].subscribe(this);
// spin the cpu for a while to make ObserveOnCompletable run this next() method
int i = 0, j = 0;
while (i < 1000_000) {
i++;
while (j < 1000_000) {
j++;
}
}
} while (decrementAndGet() != 0);