Expected Behavior
Flux.thenMany should ignore all emissions of a Flux.concat
Actual Behavior
It only ignores emissions of the last concatted publisher due to an incorrect optimization.
Steps to Reproduce
@Test
void reproCase() {
Flux.concat(Flux.just("a"), Flux.just("b"))
// adding a .hide() here fixes the test case
.thenMany(Flux.just("c"))
.as(StepVerifier::create)
.expectNext("c")
.verifyComplete();
}
Result:
java.lang.AssertionError: expectation "expectNext(c)" failed (expected value: c; actual value: a)
Adding the .hide() or enabling Hooks.onOperatorDebug() fixes the test case.
Possible Solution
Flux.thenMany uses an incorrect optimization:
if (this instanceof FluxConcatArray) {
FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;
return fluxConcatArray.concatAdditionalIgnoredLast(other); // <- incorrect!
}
Your Environment
- Reactor version(s) used: 3.6.1
- JVM version (
java -version): openjdk version "21.0.1" 2023-10-17 LTS
Expected Behavior
Flux.thenManyshould ignore all emissions of aFlux.concatActual Behavior
It only ignores emissions of the last concatted publisher due to an incorrect optimization.
Steps to Reproduce
Result:
Adding the
.hide()or enablingHooks.onOperatorDebug()fixes the test case.Possible Solution
Flux.thenManyuses an incorrect optimization:Your Environment
java -version): openjdk version "21.0.1" 2023-10-17 LTS