Affects: 6.1.5
Description
@RestController
which is producing Flux<T>
as MediaType.APPLICATION_JSON_VALUE
will fail to encode an error, handled by @RestControllerAdvice
, if the Flux contains multiple signals starting with the first one onNext
followed by onError
(s) after it.
Such signal arrangement in the Flux
will result in a stacktrace that looks like this as the AbstractJackson2Encoder
will fail to encode the error because the ServerHttpResponse was already committed and partially transferred to the client (the first item from the Flux
with onNext
):
2024-04-13T01:08:17.665+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [7ac50042-1] Error [java.lang.UnsupportedOperationException] for HTTP GET "/failEncoder", but ServerHttpResponse already committed (200 OK)
2024-04-13T01:08:17.666+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [7ac50042-1, L:/127.0.0.1:8080 - R:/127.0.0.1:52702] Error finishing response. Closing connection
java.lang.UnsupportedOperationException: null
at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Exception handler com.example.demo.ExceptionHandlers#handleAllErrors(Exception), error="FAIL state encountered" [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/failEncoder" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
at org.springframework.http.HttpHeaders.setContentLength(HttpHeaders.java:967) ~[spring-web-6.1.5.jar:6.1.5]
at org.springframework.http.codec.EncoderHttpMessageWriter.lambda$write$1(EncoderHttpMessageWriter.java:135) ~[spring-web-6.1.5.jar:6.1.5]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2842) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2573) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2909) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:85) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.4.jar:3.6.4]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:418) ~[spring-web-6.1.5.jar:6.1.5]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:208) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:352) ~[reactor-netty-core-1.1.17.jar:1.1.17]
at reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:434) ~[reactor-netty-core-1.1.17.jar:1.1.17]
at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:598) ~[reactor-netty-core-1.1.17.jar:1.1.17]
at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:118) ~[reactor-netty-core-1.1.17.jar:1.1.17]
at io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:748) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:303) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:383) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:937) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:800) ~[reactor-netty-core-1.1.17.jar:1.1.17]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Testing with Mozilla Firefox client
Testing with Postman (v9.25.2)
Steps to reproduce
Demo project with reproducible issue
Comment From: bclozel
What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.
Are you requesting a change of behavior here? Can you explain what it is?
Comment From: Airidas36
What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.
Are you requesting a change of behavior here? Can you explain what it is?
Previously, AbstractJackson2Encoder
would accumulate the Flux
signals behind the scenes itself by calling .collectList()
, and serializing the data. If I specify that I produce a non-streaming media type of application/json
and have a finite amount of flux items, I would expect the framework to handle serialization, or error propagation from the Flux
if it contained any onError
. But it seems that the contents of Flux
are streamed anyways, even if application/json
is specified and fails upon the first encountered onError
signal if onNext
was present before.
As I see, having a method in the controller with a return type Flux<T>
and non-streaming media type is only supported for cases when no subsequent onError
signals are present in the Flux
. I think it should documented that this approach should only be used for cases when no onError
signals are present, or the user should handle them himself before sending it over to the encoder, or just state that Mono<List<T>>
is recommended for producing non-streaming types.
Comment From: rstoyanchev
We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via Fux#collectToList()
if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.
Comment From: Airidas36
We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via
Fux#collectToList()
if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.
But wouldn't it make sense to approach encoding based on the media type that is supposed to be produced? As a user, if my intention is to return an application/json
, I would expect the Flux
is collected to a List
and any error signals are thrown up the stack. Streaming of course would make sense if I'm producing application/x-ndjson
.
Comment From: rstoyanchev
The trade-off is additional memory to aggregate, and if we aggregate by default, there is no way to get it to behave whichever way you prefer. This way, you can choose to aggregate or not. I suppose we could make this configurable in some way, if you would be okay to switch it wholesale.