With Spring 5.2.3 I'm experiencing a Bug when reading server sent events with the web client. I've managed to write this minimal sample to reproduce the bug:
This code works perfectly fine on Spring 5.2.2
public static void main(String[] args) {
ServerSentEventHttpMessageReader reader = new ServerSentEventHttpMessageReader();
DataBufferFactory factory = new DefaultDataBufferFactory();
ReactiveHttpInputMessage message = new ReactiveHttpInputMessage() {
@Override
public Flux<DataBuffer> getBody() {
return Flux.just(
factory.wrap(":ping\n".getBytes(StandardCharsets.UTF_8)),
factory.wrap("\n".getBytes(StandardCharsets.UTF_8))
);
}
@Override
public HttpHeaders getHeaders() {
return new HttpHeaders();
}
};
Flux<Object> flux = reader.read(ResolvableType.forType(String.class), message, emptyMap());
flux.collectList().block().forEach(System.out::println);
}
But on Spring 5.2.3 an exception is thrown. This also happens when the comment :ping is missing and just a newline is received.
Exception in thread "main" java.lang.NullPointerException: The mapper returned a null value.
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:295)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:295)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainSync(FluxFlattenIterable.java:576)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:646)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:273)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.request(FluxBufferPredicate.java:149)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:281)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request(FluxContextStart.java:132)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:281)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.request(FluxPeekFuseable.java:775)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.request(FluxBufferPredicate.java:149)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onSubscribe(MonoCollectList.java:72)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onSubscribe(FluxBufferPredicate.java:180)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onSubscribe(FluxPeekFuseable.java:808)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:185)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onSubscribe(FluxContextStart.java:97)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:185)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onSubscribe(FluxBufferPredicate.java:180)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:215)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:53)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.Mono.block(Mono.java:1662)
at de.codecentric.boot.admin.server.web.InstancesControllerIntegrationTest.main(InstancesControllerIntegrationTest.java:243)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1663)
... 1 more
Comment From: OLPMO
Could you provide minimal demo to reproduce the problem?
Comment From: joshiste
Could you provide minimal demo to reproduce the problem?
There is already code in the issue description to reproduce the problem...