Skip to content

Flux getting into hang state on any java.lang.Error or its subclasses #3036

@kushagraThapar

Description

@kushagraThapar

While a thread is getting data from Flux, it it encounters any java.lang.Error or its subclasses, Flux goes into a hang state causing the whole application to hang.

This is the sample thread dump where we would see the hang:

"main" #1 prio=5 os_prio=31 tid=0x00007fb7d1808800 nid=0x1003 waiting on condition [0x000070000279e000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000076be96458> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:179)
	at com.example.common.FluxHangIssue.main(FluxHangIssue.java:42)

Expected Behavior

Flux should not get into a hang state, rather should bubble up the error, or terminate / crash the process.

Actual Behavior

Goes into an infinite hang state.

Steps to Reproduce

This shows how to simulate the hang - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java

public static void main(String[] args) throws InterruptedException {

        Flux<Integer> integerFlux = Flux.range(0, 7).map(number -> {
            logger.info("Number is : {}", number);
            if (number > 5) {
                throw new OutOfMemoryError("Custom GC Failure");
            }
            return number;
        }).doOnError(ex -> {
            logger.error("Completed exceptionally", ex);
        }).doOnNext(next -> {
            logger.info("Next is : {}", next);
        }).doOnComplete(() -> {
            logger.info("Completed successfully");
        }).doFinally(signalType -> {
            logger.info("Finally signal is : {}", signalType);
        }).onErrorMap(throwable -> {
            logger.info("On error map", throwable);
            return throwable;
        }).onErrorContinue((throwable, object) -> {
            logger.error("on error continue : {}", object, throwable);
        }).onErrorStop().onErrorReturn(6).onErrorResume(throwable -> {
            logger.info("on error resume", throwable);
            return Mono.error(throwable);
        }).subscribeOn(Schedulers.boundedElastic());

        Iterator<Integer> integers = integerFlux.toIterable().iterator();
        while(integers.hasNext()) {
            logger.info("Next value is : {}", integers.next());
        }

        logger.info("Going to sleep now");

        Thread.sleep(5000);

        logger.info("I woke up");
    }

Possible Solution

One solution that I think might work is to configure the timeout API on such flux, to avoid the hang and let reactor propagate the TimeoutException
This method shows how the hang can be avoided - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java#L59

  • Reactor version(s) used: 3.4.14

Metadata

Metadata

Assignees

No one assigned

    Labels

    status/invalidWe don't feel this issue is valid, or the root cause was found outside of Reactor

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions