Skip to content

Support for simpler use of Context on different signals #947

@dfeist

Description

@dfeist

Ideally should be able to do simple operations using context on next/error signals such as set/reset current thread context classloader or maybe set a ThreadLocal for some legacy code.

Currently, this can be done fairly easily for onNext signals, but in cases where there is no need to do anything with the event, it preferred if it was possible to use the current context in doOnNext or doOnEach.

For error signals, on the other hand, there is currently no way, that I could find, to do something simple using the current context on error signals.

It may not make sense to have a context-aware version of all onX methods, but by having a doOnEach version that receives the current context at least these use case could be achieved easily.

Currently, need to do something like this:

.transform(doOnNextOrErrorWithContext(context -> context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
.transform(next)
.transform(doOnNextOrErrorWithContext(context -> context.getOrEmpty(TCCL_ORIGINAL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl)))));


  private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(Consumer<Context> contextConsumer) {
    return lift((scannable, subscriber) -> new CoreSubscriber<CoreEvent>() {

      private Context context = subscriber.currentContext();

      @Override
      public void onNext(CoreEvent event) {
        contextConsumer.accept(context);
        subscriber.onNext(event);
      }
      @Override
      public void onError(Throwable throwable) {
        contextConsumer.accept(context);
        subscriber.onError(throwable);
      }

      @Override
      public void onComplete() {
        subscriber.onComplete();
      }

      @Override
      public Context currentContext() {
        return context;
      }

      @Override
      public void onSubscribe(Subscription s) {
        subscriber.onSubscribe(s);
      }
    });
  }

But should really be able to do something like this:

.doOnEachWithContext((s, context) -> {
    context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
}
.transform(next)
.doOnEachWithContext((s, context) -> {
    context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions