Skip to content

Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException #3762

@kz-dt

Description

@kz-dt

Expected Behavior

No exceptions are thrown

Actual Behavior

An exception is thrown:

java.lang.ClassCastException: class reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxRefCount$RefCountInner.setRefCountMonitor(FluxRefCount.java:209)
	at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:85)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4361)
	at org.example.ReactorClassCastExceptionReproducerSample.runSample(ReactorClassCastExceptionReproducerSample.java:44)
	at org.example.ReactorClassCastExceptionReproducerSample.main(ReactorClassCastExceptionReproducerSample.java:18)

Steps to Reproduce

Reproducer is here: https://github.com/kz-dt/reactor_class_cast_exception_reproducer

private <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
      return Operators.lift((a, b) -> b);
}
@Test
void reproCase() {
     Hooks.onEachOperator("testTracingLift", tracingLift());
     Hooks.enableAutomaticContextPropagation();
           
      Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
      Flux<String> flux = sink
	      .asFlux()
	      .doOnRequest(v -> System.out.println("OnDoRequest " + v))
	      .doOnTerminate(() -> System.out.println("doOnTerminate"))
	      .doOnCancel(() -> System.out.println("doOnCancel"))
	      .publish()
	      .refCount();
      
      Mono<List<String>> res = flux.map(s -> s + " mapped").collectList();
      res.subscribe(v -> System.out.println("Received a list of mapped strings: " + v));
}

Possible Solution

It looks like Operators.lift returns wrong subscriber type. Calling Hooks.onOperatorDebug(); "fixes" the issue since it adds another wrapper.

Your Environment

reactor-core 3.6.+
micrometer jar should be in classpath

The issue originally was discovered with SpringBoot 3.2+ and couchbase java client 3.4.11, the dependencies were the following:
org.springframework.boot:spring-boot-starter-webflux:3.2.0
org.springframework.boot:spring-boot-starter-data-couchbase-reactive:3.2.0

  • Reactor version(s) used: 3.6.0 - 3.6.4
  • Other relevant libraries versions (eg. netty, ...): micrometer-tracing:1.2.1
  • JVM version (java -version):
    openjdk version "11.0.20.1" 2023-08-24
    OpenJDK Runtime Environment Temurin-11.0.20.1+1 (build 11.0.20.1+1)
    OpenJDK 64-Bit Server VM Temurin-11.0.20.1+1 (build 11.0.20.1+1, mixed mode)
  • OS and version (eg uname -a): MINGW64_NT-10.0-22631 DT-3Q41FY3 3.4.7-25de8b84.x86_64 2023-08-28 21:32 UTC x86_64 Msys

Metadata

Metadata

Assignees

Labels

area/contextThis issue is related to the Contexttype/bugA general bug

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions