Skip to content

Commit 7f6f65f

Browse files
authored
ensures onLastAssembly does not break fusion chain (#3156)
as it was observed, `onLastOperator` may add extra operators to the chain. In the case of scalar fusion at flatMap operators (main/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java#L118) the inner chain may be fused with the outer downstream. Since `onLastOperator` must be applied to the inner chain, modification added by the call may break fusion which leads to the ClastCastException
1 parent 4768c43 commit 7f6f65f

3 files changed

Lines changed: 36 additions & 2 deletions

File tree

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8445,6 +8445,10 @@ public final void subscribe(Subscriber<? super T> actual) {
84458445
CorePublisher publisher = Operators.onLastAssembly(this);
84468446
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
84478447

8448+
if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
8449+
subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
8450+
}
8451+
84488452
try {
84498453
if (publisher instanceof OptimizableOperator) {
84508454
OptimizableOperator operator = (OptimizableOperator) publisher;

reactor-core/src/main/java/reactor/core/publisher/Mono.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4375,6 +4375,10 @@ public final void subscribe(Subscriber<? super T> actual) {
43754375
CorePublisher publisher = Operators.onLastAssembly(this);
43764376
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
43774377

4378+
if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
4379+
subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
4380+
}
4381+
43784382
try {
43794383
if (publisher instanceof OptimizableOperator) {
43804384
OptimizableOperator operator = (OptimizableOperator) publisher;

reactor-core/src/test/java/reactor/core/publisher/HooksTest.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -70,7 +70,6 @@ public TestException(String message) {
7070
}
7171
}
7272

73-
7473
@Test
7574
public void staticActivationOfOperatorDebug() {
7675
String oldProp = System.setProperty("reactor.trace.operatorStacktrace", "true");
@@ -142,6 +141,7 @@ public void onEachOperatorOneHookNoComposite() {
142141
Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> p;
143142
Hooks.onEachOperator(hook);
144143

144+
145145
assertThat(Hooks.onEachOperatorHook).isSameAs(hook);
146146
}
147147

@@ -1254,4 +1254,30 @@ public void onComplete() {
12541254
}
12551255
};
12561256
}
1257+
1258+
// https://github.com/reactor/reactor-core/issues/3137
1259+
@Test
1260+
public void reproduceClassCastExceptionWithHooks() {
1261+
Hooks.onLastOperator(objectPublisher -> {
1262+
if (objectPublisher instanceof Mono) {
1263+
return Hooks.convertToMonoBypassingHooks(objectPublisher, false)
1264+
.doFinally(signalType -> {
1265+
});
1266+
} else {
1267+
return objectPublisher;
1268+
}
1269+
});
1270+
1271+
try {
1272+
Mono.just(1)
1273+
.flatMap(fsm -> Mono.just(1)
1274+
.doOnSubscribe(subscription -> {
1275+
}))
1276+
.doOnSubscribe(subscription -> {
1277+
})
1278+
.block();
1279+
} finally {
1280+
Hooks.resetOnLastOperator();
1281+
}
1282+
}
12571283
}

0 commit comments

Comments
 (0)