-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Labels
Description
It looks like when using a scheduler created from Schedulers#from using timeout will not interrupt the task:
Version 2.2.5
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
Scheduler schedulerFromExecutor = Schedulers.from(scheduledExecutorService);
Flowable<Integer> results = Flowable.fromIterable(()-> {
return new Iterator<Integer>() {
@Override
public boolean hasNext()
{
try
{
Thread.sleep(1000);
return false;
}
catch (InterruptedException e)
{
System.out.println("Interrupted! " + e);
return true;
}
}
@Override
public Integer next()
{
return 2;
}
};
}).subscribeOn(schedulerFromExecutor);//change to Schedulers.io() to make it work.
results.timeout(500, TimeUnit.MILLISECONDS, Schedulers.single(), Flowable.error(new TimeoutException("Timed out")))
.doOnTerminate(()-> System.out.println("Finished"))
.subscribe(r-> System.out.println("Got " + r), e-> System.out.println("Error " + e));
Thread.sleep(2000);Output when using schedulerFromExecutor
Finished
Error java.util.concurrent.TimeoutException: Timed out
Output when using Schedulers.io()
Finished
Interrupted! java.lang.InterruptedException: sleep interrupted
Error java.util.concurrent.TimeoutException: Timed out