-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Support for simpler use of Context on different signals #947
Description
Ideally should be able to do simple operations using context on next/error signals such as set/reset current thread context classloader or maybe set a ThreadLocal for some legacy code.
Currently, this can be done fairly easily for onNext signals, but in cases where there is no need to do anything with the event, it preferred if it was possible to use the current context in doOnNext or doOnEach.
For error signals, on the other hand, there is currently no way, that I could find, to do something simple using the current context on error signals.
It may not make sense to have a context-aware version of all onX methods, but by having a doOnEach version that receives the current context at least these use case could be achieved easily.
Currently, need to do something like this:
.transform(doOnNextOrErrorWithContext(context -> context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
.transform(next)
.transform(doOnNextOrErrorWithContext(context -> context.getOrEmpty(TCCL_ORIGINAL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl)))));
private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(Consumer<Context> contextConsumer) {
return lift((scannable, subscriber) -> new CoreSubscriber<CoreEvent>() {
private Context context = subscriber.currentContext();
@Override
public void onNext(CoreEvent event) {
contextConsumer.accept(context);
subscriber.onNext(event);
}
@Override
public void onError(Throwable throwable) {
contextConsumer.accept(context);
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public Context currentContext() {
return context;
}
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
});
}
But should really be able to do something like this:
.doOnEachWithContext((s, context) -> {
context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
}
.transform(next)
.doOnEachWithContext((s, context) -> {
context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(cl -> currentThread().setContextClassLoader((ClassLoader) cl))))
}