Affects: spring-core-6.1.1 Given this code:
Mono.just("a")
.flatMap(a -> {
return Mono.deferContextual(c -> c.get("KEY"))
.flatMap(val -> {
System.out.println(val);
return ReactiveWrapperConverters.toWrapper(Uni.createFrom().item("abcd"), Mono.class);
});
}).contextWrite(Context.of("KEY", Mono.just("VALUE")))
.subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());
Execute it will print to console:
VALUE
abcd
But with this code:
Mono.just("a")
.flatMap(a -> {
return Mono.deferContextual(c -> c.get("KEY"))
.flatMap(val -> {
System.out.println(val);
return ReactiveWrapperConverters.toWrapper(ReactiveWrapperConverters.toWrapper(Mono.deferContextual(c -> c.get("KEY")), Uni.class).flatMap(v -> {
System.out.println(v);
return Uni.createFrom().item("abcd");
}), Mono.class);
});
}).contextWrite(Context.of("KEY", Mono.just("VALUE")))
.subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());
Will print:
val
java.util.NoSuchElementException: Context is empty
at reactor.util.context.Context0.get(Context0.java:43)
at com.example.demo.DemoApplication.lambda$main$1(DemoApplication.java:58)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at org.reactivestreams.FlowAdapters$FlowPublisherFromReactive.subscribe(FlowAdapters.java:366)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.forward(UniCreateFromPublisher.java:41)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher.subscribe(UniCreateFromPublisher.java:26)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
at org.reactivestreams.FlowAdapters$ReactiveToFlowSubscription.request(FlowAdapters.java:182)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onSubscribe(FlowAdapters.java:206)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
at org.reactivestreams.FlowAdapters$ReactivePublisherFromFlow.subscribe(FlowAdapters.java:348)
at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4496)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
at reactor.core.publisher.Mono.subscribe(Mono.java:4386)
at com.example.demo.DemoApplication.main(DemoApplication.java:65)
The second Mono.deferContextual(c -> c.get("KEY"))
(inside ReactiveWrapperConverters.toWrapper
) is not working.
This is what ReactiveWrapperConverters.toWrapper
(from spring-data-commons) do:
@Override
@SuppressWarnings({ "ConstantConditions", "unchecked" })
public <T> Converter<Object, T> getConverter(Class<T> targetType) {
return source -> {
Publisher<?> publisher = source instanceof Publisher ? (Publisher<?>) source
: RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(Publisher.class, source).toPublisher(source);
ReactiveAdapter adapter = RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(targetType);
return (T) adapter.fromPublisher(publisher);
};
}
Comment From: simonbasle
This is expected, as Reactor Context cannot be propagated through other flavours of Publisher
.
In fact, "propagating" is not the best word to describe how context is accessed: in a reactive stream chain, each operator's subscriber has access to the next subscriber in line (to pass data down the chain), and in the case of Reactor-to-Reactor chains it means that an operator can also call CoreSubscriber#currentContext()
. If in the middle of the chain there is a Subscriber
that is not a Reactor's CoreSubscriber
this prevents access to Context defined further down the line.