Skip to content

2.x: Disposable already set! using delaySubscription with takeUntil/repeatWhen #6358

@eleventigers

Description

@eleventigers

Issue encountered in: io.reactivex.rxjava2:rxjava:2.2.4

When using a combination of takeUntil/repeatWhen operators on an Observable with delaySubscription operator applied, it is possible to encounter a ProtocolViolationException: Disposable already set! thrown by RepeatWhenObserver#onSubscribe. It appears that if delaySubscription uses an ObservableSource which rapidly/concurrently emits multiple notifications a rare race condition leads to a double subscription issue.

I am not sure if this issue is a user error (take(1) on ObservableSource supplied to delaySubscription alleviates it) or a bug in RxJava however the following, slow unit test demonstrates it:

    @Test
    public void delaySubscriptionWithRepeatWhen() {
        List<Throwable> errors = TestHelper.trackPluginErrors();

        ExecutorService executor = Executors.newFixedThreadPool(2);
        try {
            Observable<Object> subscriptionSignal = Observable.merge(
                    Observable.just(new Object()).delay(0, TimeUnit.MILLISECONDS),
                    Observable.just(new Object()).delay(0, TimeUnit.MILLISECONDS)
            );

            final Subject<Boolean> boolStream = PublishSubject.<Boolean>create().toSerialized();

            Observable<Boolean> brokenStream = boolStream
                    .delaySubscription(subscriptionSignal)
                    .takeUntil(new Predicate<Boolean>()  {
                        @Override
                        public boolean test(Boolean state) throws Exception {
                            return state;
                        }
                    })
                    .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                            return boolStream.filter(new Predicate<Boolean>() {
                                @Override
                                public boolean test(Boolean state) throws Exception {
                                    return !state;
                                }
                            });
                        }
                    });

            final Random random = new Random();
            for (int i = 0; i < 1000000; i++) {
                final TestObserver<Boolean> testObserver = brokenStream.test();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        boolStream.onNext(random.nextBoolean());
                        testObserver
                                .assertNoErrors()
                                .dispose();
                    }
                });
                if (!errors.isEmpty()) {
                    fail("Uncaught fatal errors: " + errors);
                }
            }
        } finally {
            executor.shutdown();
            try {
                executor.awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions