Skip to content

Commit 4c0b791

Browse files
authored
Add onErrorComplete operator and rework onErrorReturn (#3159)
This commit introduces a new implementation that optimizes the onErrorReturn case to avoid going through a generated `Publisher`. Additionally, this implementation can easily accept a `null` fallback value, which covers a common case that has since now been covered by `onErrorResume(e -> Mono.empty())`. We take that as an opportunity to introduce a convenient operator for that later case called `onErrorComplete`.
1 parent 6769cd2 commit 4c0b791

11 files changed

Lines changed: 958 additions & 18 deletions

File tree

docs/asciidoc/apdx-operatorChoice.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ I want to deal with:
217217
** throwing: `error` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#error-java.lang.Throwable-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#error-java.lang.Throwable-[Mono])
218218
** catching an exception:
219219
*** and falling back to a default value: `onErrorReturn` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorReturn-java.lang.Class-T-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorReturn-java.lang.Class-T-[Mono])
220+
*** and swallowing the error (ie. complete): `onErrorComplete` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorComplete-java.lang.Class-T-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorComplete-java.lang.Class-T-[Mono])
220221
*** and falling back to another https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html[Flux] or https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono]: `onErrorResume` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorResume-java.lang.Class-java.util.function.Function-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorResume-java.lang.Class-java.util.function.Function-[Mono])
221222
*** and wrapping and re-throwing: `.onErrorMap(t -> new RuntimeException(t))` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorMap-java.util.function.Function-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorMap-java.util.function.Function-[Mono])
222223
** the finally block: `doFinally` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doFinally-java.util.function.Consumer-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#doFinally-java.util.function.Consumer-[Mono])
@@ -225,6 +226,7 @@ I want to deal with:
225226
* I want to recover from errors...
226227
** by falling back:
227228
*** to a value: `onErrorReturn` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorReturn-java.lang.Class-T-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorReturn-java.lang.Class-T-[Mono])
229+
*** to a completion ("swallowing" the error): `onErrorComplete` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorComplete-java.lang.Class-T-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorComplete-java.lang.Class-T-[Mono])
228230
*** to a https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html?is-external=true[Publisher] or https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono], possibly different ones depending on the error: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorResume-java.lang.Class-java.util.function.Function-[Flux#onErrorResume] and https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorResume-java.lang.Class-java.util.function.Function-[Mono#onErrorResume]
229231
** by retrying...
230232
*** ...with a simple policy (max number of attempts): `retry()` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#retry--[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#retry--[Mono]), `retry(long)` (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#retry-long-[Flux]|link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#retry-long-[Mono])

docs/asciidoc/coreFeatures.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,26 @@ Flux.just(10)
477477
<1> Recover only if the message of the exception is `"boom10"`
478478
====
479479

480+
==== Catch and swallow the error
481+
482+
If you don't even want to replace the exception with a fallback value, but instead to ignore it and
483+
only propagate elements that have been produced so far, what you want is essentially replacing
484+
the `onError` signal with an `onComplete` signal. This can be done by the `onErrorComplete` operator:
485+
486+
====
487+
[source,java]
488+
----
489+
Flux.just(10,20,30)
490+
.map(this::doSomethingDangerousOn30)
491+
.onErrorComplete(); //<1>
492+
----
493+
<1> Recover by turning the `onError` into an `onComplete`
494+
----
495+
====
496+
497+
Like `onErrorReturn`, `onErrorComplete` has variants that let you filter which exceptions
498+
to fall back on, based either on the exception's class or on a `Predicate`.
499+
480500
==== Fallback Method
481501

482502
If you want more than a single default value and you have an alternative (safer) way of
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.publisher;
18+
19+
import org.openjdk.jcstress.annotations.Actor;
20+
import org.openjdk.jcstress.annotations.Arbiter;
21+
import org.openjdk.jcstress.annotations.JCStressTest;
22+
import org.openjdk.jcstress.annotations.Outcome;
23+
import org.openjdk.jcstress.annotations.State;
24+
import org.openjdk.jcstress.infra.results.IIIII_Result;
25+
import org.openjdk.jcstress.infra.results.IIII_Result;
26+
import org.openjdk.jcstress.infra.results.III_Result;
27+
28+
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
29+
import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN;
30+
31+
public abstract class FluxOnErrorReturnStressTest {
32+
33+
@JCStressTest
34+
@Outcome(id = {"1, 1, 0, 1, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r1")
35+
@Outcome(id = {"1, 1, 0, 2, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r2")
36+
@Outcome(id = {"1, 1, 0, 3, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r3 or r1+r2")
37+
@Outcome(id = {"1, 1, 0, 4, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r1+r3")
38+
@Outcome(id = {"1, 1, 0, 5, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r2+r3")
39+
@Outcome(id = {"1, 1, 0, 6, 0"}, expect = ACCEPTABLE, desc = "error triggered fallback after r1+r2+r3")
40+
@Outcome(id = {"1, 1, 0, 1, 1"}, expect = ACCEPTABLE, desc = "deferred fallback until request1")
41+
@Outcome(id = {"1, 1, 0, 2, 1"}, expect = ACCEPTABLE, desc = "deferred fallback until request2")
42+
@Outcome(id = {"1, 1, 0, 3, 1"}, expect = ACCEPTABLE, desc = "deferred fallback until request3")
43+
@Outcome(id = {"1, 1, 0, 0, 0"}, expect = FORBIDDEN, desc = "fallback sent despite no demand")
44+
@State
45+
public static class ErrorFallbackVsRequestStressTest {
46+
47+
final Throwable ERROR = new IllegalStateException("expected");
48+
49+
final StressSubscriber<Integer> subscriber = new StressSubscriber<>(0L);
50+
final FluxOnErrorReturn.ReturnSubscriber<Integer> test = new FluxOnErrorReturn.ReturnSubscriber<>(subscriber, null, 100, true);
51+
final StressSubscription<Integer> topmost = new StressSubscription<>(test);
52+
53+
{
54+
test.onSubscribe(topmost);
55+
}
56+
57+
@Actor
58+
public void error() {
59+
test.onError(ERROR);
60+
}
61+
62+
@Actor
63+
public void request1() {
64+
subscriber.request(1);
65+
}
66+
67+
@Actor
68+
public void request2() {
69+
subscriber.request(2);
70+
}
71+
72+
@Actor
73+
public void request3() {
74+
subscriber.request(3);
75+
}
76+
77+
@Arbiter
78+
public void arbiter(IIIII_Result r) {
79+
r.r1 = subscriber.onNextCalls.get();
80+
r.r2 = subscriber.onCompleteCalls.get();
81+
r.r3 = subscriber.onErrorCalls.get();
82+
r.r4 = (int) topmost.requested;
83+
r.r5 = topmost.cancelled.get() ? 1 : 0;
84+
}
85+
}
86+
}

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6755,6 +6755,52 @@ public final Flux<T> onBackpressureLatest() {
67556755
return onAssembly(new FluxOnBackpressureLatest<>(this));
67566756
}
67576757

6758+
/**
6759+
* Simply complete the sequence by replacing an {@link Subscriber#onError(Throwable) onError signal}
6760+
* with an {@link Subscriber#onComplete() onComplete signal}. All other signals are propagated as-is.
6761+
*
6762+
* <p>
6763+
* <img class="marble" src="doc-files/marbles/onErrorCompleteForFlux.svg" alt="">
6764+
*
6765+
* @return a new {@link Flux} falling back on completion when an onError occurs
6766+
* @see #onErrorReturn(Object)
6767+
*/
6768+
public final Flux<T> onErrorComplete() {
6769+
return onAssembly(new FluxOnErrorReturn<>(this, null, null));
6770+
}
6771+
6772+
/**
6773+
* Simply complete the sequence by replacing an {@link Subscriber#onError(Throwable) onError signal}
6774+
* with an {@link Subscriber#onComplete() onComplete signal} if the error matches the given
6775+
* {@link Class}. All other signals, including non-matching onError, are propagated as-is.
6776+
*
6777+
* <p>
6778+
* <img class="marble" src="doc-files/marbles/onErrorCompleteForFlux.svg" alt="">
6779+
*
6780+
* @return a new {@link Flux} falling back on completion when a matching error occurs
6781+
* @see #onErrorReturn(Class, Object)
6782+
*/
6783+
public final Flux<T> onErrorComplete(Class<? extends Throwable> type) {
6784+
Objects.requireNonNull(type, "type must not be null");
6785+
return onErrorComplete(type::isInstance);
6786+
}
6787+
6788+
/**
6789+
* Simply complete the sequence by replacing an {@link Subscriber#onError(Throwable) onError signal}
6790+
* with an {@link Subscriber#onComplete() onComplete signal} if the error matches the given
6791+
* {@link Predicate}. All other signals, including non-matching onError, are propagated as-is.
6792+
*
6793+
* <p>
6794+
* <img class="marble" src="doc-files/marbles/onErrorCompleteForFlux.svg" alt="">
6795+
*
6796+
* @return a new {@link Flux} falling back on completion when a matching error occurs
6797+
* @see #onErrorReturn(Predicate, Object)
6798+
*/
6799+
public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate) {
6800+
Objects.requireNonNull(predicate, "predicate must not be null");
6801+
return onAssembly(new FluxOnErrorReturn<>(this, predicate, null));
6802+
}
6803+
67586804
/**
67596805
* Let compatible operators <strong>upstream</strong> recover from errors by dropping the
67606806
* incriminating element from the sequence and continuing with subsequent elements.
@@ -7004,9 +7050,11 @@ public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate,
70047050
* @param fallbackValue the value to emit if an error occurs
70057051
*
70067052
* @return a new falling back {@link Flux}
7053+
* @see #onErrorComplete()
70077054
*/
70087055
public final Flux<T> onErrorReturn(T fallbackValue) {
7009-
return onErrorResume(t -> just(fallbackValue));
7056+
Objects.requireNonNull(fallbackValue, "fallbackValue must not be null");
7057+
return onAssembly(new FluxOnErrorReturn<>(this, null, fallbackValue));
70107058
}
70117059

70127060
/**
@@ -7020,10 +7068,11 @@ public final Flux<T> onErrorReturn(T fallbackValue) {
70207068
* @param <E> the error type
70217069
*
70227070
* @return a new falling back {@link Flux}
7071+
* @see #onErrorComplete(Class)
70237072
*/
7024-
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type,
7025-
T fallbackValue) {
7026-
return onErrorResume(type, t -> just(fallbackValue));
7073+
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue) {
7074+
Objects.requireNonNull(type, "type must not be null");
7075+
return onErrorReturn(type::isInstance, fallbackValue);
70277076
}
70287077

70297078
/**
@@ -7036,9 +7085,12 @@ public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type,
70367085
* @param fallbackValue the value to emit if an error occurs that matches the predicate
70377086
*
70387087
* @return a new falling back {@link Flux}
7088+
* @see #onErrorComplete(Predicate)
70397089
*/
70407090
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) {
7041-
return onErrorResume(predicate, t -> just(fallbackValue));
7091+
Objects.requireNonNull(predicate, "predicate must not be null");
7092+
Objects.requireNonNull(fallbackValue, "fallbackValue must not be null");
7093+
return onAssembly(new FluxOnErrorReturn<>(this, predicate, fallbackValue));
70427094
}
70437095

70447096
/**

0 commit comments

Comments
 (0)