Skip to content

Interruption does not happen when timing out a task scheduled on Schedulers.from(executor) #6368

@BrynCooke

Description

@BrynCooke

It looks like when using a scheduler created from Schedulers#from using timeout will not interrupt the task:

Version 2.2.5

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        Scheduler schedulerFromExecutor = Schedulers.from(scheduledExecutorService);

        Flowable<Integer> results = Flowable.fromIterable(()-> {

            return new Iterator<Integer>() {
                @Override
                public boolean hasNext()
                {
                    try
                    {
                        Thread.sleep(1000);
                        return false;
                    }
                    catch (InterruptedException e)
                    {
                        System.out.println("Interrupted! " + e);
                        return true;
                    }
                }

                @Override
                public Integer next()
                {
                    return 2;
                }
            };

        }).subscribeOn(schedulerFromExecutor);//change to Schedulers.io() to make it work.

        results.timeout(500, TimeUnit.MILLISECONDS, Schedulers.single(), Flowable.error(new TimeoutException("Timed out")))
                .doOnTerminate(()-> System.out.println("Finished"))
                .subscribe(r-> System.out.println("Got " + r), e-> System.out.println("Error " + e));

        Thread.sleep(2000);

Output when using schedulerFromExecutor

Finished
Error java.util.concurrent.TimeoutException: Timed out

Output when using Schedulers.io()

Finished
Interrupted! java.lang.InterruptedException: sleep interrupted
Error java.util.concurrent.TimeoutException: Timed out

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions