Skip to content

Single.observeOn wrongly interrupt thread #6373

@ArtemShaban

Description

@ArtemShaban

I tested issue on RxJava 2.2.5

I encountered the issue when Single.observeOn interrupt the thread, that performs downstream operation.

Below code of unit test and a slightly modified for test purposes "SingleObserveOn" class.

@Test
public void test() throws InterruptedException
{
    Single.ambArray
            (
                    Single
                            .fromCallable(() ->
                            {
                                System.out.println(System.currentTimeMillis() + " " + "Callable1! " + Thread.currentThread());
                                return "Qqq";
                            })
                            .subscribeOn(Schedulers.newThread())
                            .observeOn(Schedulers.newThread())
                    ,
                    Single.never()
            )
          .subscribe((s, throwable) ->
          {
              System.out.println(System.currentTimeMillis() + " " + "Subscribe1 and blocking await! " + Thread.currentThread());

              Completable.timer(1000, TimeUnit.MILLISECONDS).blockingAwait();

              System.out.println(System.currentTimeMillis() + " " + "Subscribe2! " + Thread.currentThread());
              System.out.println(s);
          });

    Thread.sleep(10000);
}
public final class SingleObserveOn<T> extends Single<T>
{

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler)
    {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer)
    {
        source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
            implements SingleObserver<T>, Disposable, Runnable
    {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler)
        {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d)
        {
            if (DisposableHelper.setOnce(this, d))
            {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value)
        {
            this.value = value;

            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess1! " + Thread.currentThread());

            Disposable d = scheduler.scheduleDirect(this);
            //I added this loop for simulate "busy" thread situation.
            for (int i = 0; i < 1_000_000; i++)
            {
            }

            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess2! " + Thread.currentThread());

            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e)
        {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run()
        {
            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.run! " + Thread.currentThread());

            Throwable ex = error;
            if (ex != null)
            {
                downstream.onError(ex);
            }
            else
            {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose()
        {
            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.dispose! " + Thread.currentThread());
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed()
        {
            return DisposableHelper.isDisposed(get());
        }
    }
}

Result of run this test next:

1547699583389 Callable1! Thread[RxNewThreadScheduler-1,5,main]
1547699583389 ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main]
1547699583390 ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main]
1547699583393 ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main]
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
	at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
	at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
	at io.reactivex.Completable.blockingAwait(Completable.java:1219)
	at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
	... 11 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
	... 14 more
Exception in thread "RxNewThreadScheduler-2" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
	at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
	at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
	at io.reactivex.Completable.blockingAwait(Completable.java:1219)
	at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
	... 11 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
	... 14 more

explanation:

Callable1! Thread[RxNewThreadScheduler-1,5,main] - just return value

ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main] - after this line value scheduled to RxNewThreadScheduler-2, and sometime in the future will be excuted. And current thread RxNewThreadScheduler-1 stuck for any reason.

ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main] - scheduled value ecxecuting

ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main] - ObserveOnSingleObserver is disposing, cause value was throw to the donwstream

Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main] - we get value in subscribe callback and do some long operaion (for example blocking await)

ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main] - RxNewThreadScheduler-1 revive and continue do his work...so it replace disposable, and see than current disposable have already disposed...and he cancel future (that related long operation started in RxNewThreadScheduler-2), and interrupt RxNewThreadScheduler-2 thread.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions