Skip to content

2.2.5: observeOn might not work for Completable.andThen(Completable) #6354

@pjastrz

Description

@pjastrz
  • RxJava 2.2.5

It seems that Completable passed to Completable.andThen(...) is not run on scheduler passed to observeOn. So

        Completable.fromAction(() -> System.out.println("subscribing in thread " + Thread.currentThread().getName()))
                .observeOn(Schedulers.io())
                .andThen(Completable.fromAction(() -> System.out.println("and then in thread " + Thread.currentThread().getName())))
                .subscribe(() -> System.out.println("observing in thread " + Thread.currentThread().getName()));

might produce

subscribing in thread main
and then in thread main
observing in thread main

I don't know if it's by design or not, but it seems to be caused by 'race condition' in CompletableConcatArray where we have do {...} while() loop, when next() method is run second time from io() thread and bumps up the atomic counter, and then second iteration of do/while loop starts, and executes completable from andThen operator from main thread. It can be easily reproduced if you add some work to that loop before checking condition in while clause so in CompletableConcatArray:

                a[idx].subscribe(this);
                // spin the cpu for a while to make ObserveOnCompletable run this next() method
                int i = 0, j = 0;
                while (i < 1000_000) {
                    i++;
                    while (j < 1000_000) {
                        j++;
                    }
                }
            } while (decrementAndGet() != 0);

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions