-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Flux getting into hang state on any java.lang.Error or its subclasses #3036
Description
While a thread is getting data from Flux, it it encounters any java.lang.Error or its subclasses, Flux goes into a hang state causing the whole application to hang.
This is the sample thread dump where we would see the hang:
"main" #1 prio=5 os_prio=31 tid=0x00007fb7d1808800 nid=0x1003 waiting on condition [0x000070000279e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076be96458> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:179)
at com.example.common.FluxHangIssue.main(FluxHangIssue.java:42)
Expected Behavior
Flux should not get into a hang state, rather should bubble up the error, or terminate / crash the process.
Actual Behavior
Goes into an infinite hang state.
Steps to Reproduce
This shows how to simulate the hang - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java
public static void main(String[] args) throws InterruptedException {
Flux<Integer> integerFlux = Flux.range(0, 7).map(number -> {
logger.info("Number is : {}", number);
if (number > 5) {
throw new OutOfMemoryError("Custom GC Failure");
}
return number;
}).doOnError(ex -> {
logger.error("Completed exceptionally", ex);
}).doOnNext(next -> {
logger.info("Next is : {}", next);
}).doOnComplete(() -> {
logger.info("Completed successfully");
}).doFinally(signalType -> {
logger.info("Finally signal is : {}", signalType);
}).onErrorMap(throwable -> {
logger.info("On error map", throwable);
return throwable;
}).onErrorContinue((throwable, object) -> {
logger.error("on error continue : {}", object, throwable);
}).onErrorStop().onErrorReturn(6).onErrorResume(throwable -> {
logger.info("on error resume", throwable);
return Mono.error(throwable);
}).subscribeOn(Schedulers.boundedElastic());
Iterator<Integer> integers = integerFlux.toIterable().iterator();
while(integers.hasNext()) {
logger.info("Next value is : {}", integers.next());
}
logger.info("Going to sleep now");
Thread.sleep(5000);
logger.info("I woke up");
}Possible Solution
One solution that I think might work is to configure the timeout API on such flux, to avoid the hang and let reactor propagate the TimeoutException
This method shows how the hang can be avoided - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java#L59
- Reactor version(s) used: 3.4.14