-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
I tested issue on RxJava 2.2.5
I encountered the issue when Single.observeOn interrupt the thread, that performs downstream operation.
Below code of unit test and a slightly modified for test purposes "SingleObserveOn" class.
@Test
public void test() throws InterruptedException
{
Single.ambArray
(
Single
.fromCallable(() ->
{
System.out.println(System.currentTimeMillis() + " " + "Callable1! " + Thread.currentThread());
return "Qqq";
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
,
Single.never()
)
.subscribe((s, throwable) ->
{
System.out.println(System.currentTimeMillis() + " " + "Subscribe1 and blocking await! " + Thread.currentThread());
Completable.timer(1000, TimeUnit.MILLISECONDS).blockingAwait();
System.out.println(System.currentTimeMillis() + " " + "Subscribe2! " + Thread.currentThread());
System.out.println(s);
});
Thread.sleep(10000);
}
public final class SingleObserveOn<T> extends Single<T>
{
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler)
{
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer)
{
source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable
{
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler)
{
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d)
{
if (DisposableHelper.setOnce(this, d))
{
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value)
{
this.value = value;
System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess1! " + Thread.currentThread());
Disposable d = scheduler.scheduleDirect(this);
//I added this loop for simulate "busy" thread situation.
for (int i = 0; i < 1_000_000; i++)
{
}
System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess2! " + Thread.currentThread());
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e)
{
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run()
{
System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.run! " + Thread.currentThread());
Throwable ex = error;
if (ex != null)
{
downstream.onError(ex);
}
else
{
downstream.onSuccess(value);
}
}
@Override
public void dispose()
{
System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.dispose! " + Thread.currentThread());
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed()
{
return DisposableHelper.isDisposed(get());
}
}
}
Result of run this test next:
1547699583389 Callable1! Thread[RxNewThreadScheduler-1,5,main]
1547699583389 ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main]
1547699583390 ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main]
1547699583393 ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main]
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
at io.reactivex.Completable.blockingAwait(Completable.java:1219)
at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
... 11 more
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
... 14 more
Exception in thread "RxNewThreadScheduler-2" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
at io.reactivex.Completable.blockingAwait(Completable.java:1219)
at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
... 11 more
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
... 14 more
explanation:
Callable1! Thread[RxNewThreadScheduler-1,5,main] - just return value
ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main] - after this line value scheduled to RxNewThreadScheduler-2, and sometime in the future will be excuted. And current thread RxNewThreadScheduler-1 stuck for any reason.
ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main] - scheduled value ecxecuting
ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main] - ObserveOnSingleObserver is disposing, cause value was throw to the donwstream
Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main] - we get value in subscribe callback and do some long operaion (for example blocking await)
ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main] - RxNewThreadScheduler-1 revive and continue do his work...so it replace disposable, and see than current disposable have already disposed...and he cancel future (that related long operation started in RxNewThreadScheduler-2), and interrupt RxNewThreadScheduler-2 thread.