Affects: 6.1.5

Description

@RestController which is producing Flux<T> as MediaType.APPLICATION_JSON_VALUE will fail to encode an error, handled by @RestControllerAdvice, if the Flux contains multiple signals starting with the first one onNext followed by onError(s) after it. Such signal arrangement in the Flux will result in a stacktrace that looks like this as the AbstractJackson2Encoder will fail to encode the error because the ServerHttpResponse was already committed and partially transferred to the client (the first item from the Flux with onNext):

2024-04-13T01:08:17.665+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [7ac50042-1] Error [java.lang.UnsupportedOperationException] for HTTP GET "/failEncoder", but ServerHttpResponse already committed (200 OK)
2024-04-13T01:08:17.666+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [7ac50042-1, L:/127.0.0.1:8080 - R:/127.0.0.1:52702] Error finishing response. Closing connection

java.lang.UnsupportedOperationException: null
    at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Exception handler com.example.demo.ExceptionHandlers#handleAllErrors(Exception), error="FAIL state encountered" [DispatcherHandler]
    *__checkpoint ⇢ HTTP GET "/failEncoder" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
        at org.springframework.http.HttpHeaders.setContentLength(HttpHeaders.java:967) ~[spring-web-6.1.5.jar:6.1.5]
        at org.springframework.http.codec.EncoderHttpMessageWriter.lambda$write$1(EncoderHttpMessageWriter.java:135) ~[spring-web-6.1.5.jar:6.1.5]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2842) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2573) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2909) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:85) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.4.jar:3.6.4]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:418) ~[spring-web-6.1.5.jar:6.1.5]
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:208) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280) ~[reactor-core-3.6.4.jar:3.6.4]
        at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:352) ~[reactor-netty-core-1.1.17.jar:1.1.17]
        at reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:434) ~[reactor-netty-core-1.1.17.jar:1.1.17]
        at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:598) ~[reactor-netty-core-1.1.17.jar:1.1.17]
        at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:118) ~[reactor-netty-core-1.1.17.jar:1.1.17]
        at io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:748) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:303) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:383) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:937) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:800) ~[reactor-netty-core-1.1.17.jar:1.1.17]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Testing with Mozilla Firefox client

failEncoder failEncoderMultiple

Testing with Postman (v9.25.2)

postman_error

Steps to reproduce

Demo project with reproducible issue

Comment From: bclozel

What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.

Are you requesting a change of behavior here? Can you explain what it is?

Comment From: Airidas36

What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.

Are you requesting a change of behavior here? Can you explain what it is?

Previously, AbstractJackson2Encoder would accumulate the Flux signals behind the scenes itself by calling .collectList(), and serializing the data. If I specify that I produce a non-streaming media type of application/json and have a finite amount of flux items, I would expect the framework to handle serialization, or error propagation from the Flux if it contained any onError. But it seems that the contents of Flux are streamed anyways, even if application/json is specified and fails upon the first encountered onError signal if onNext was present before.

As I see, having a method in the controller with a return type Flux<T> and non-streaming media type is only supported for cases when no subsequent onError signals are present in the Flux. I think it should documented that this approach should only be used for cases when no onError signals are present, or the user should handle them himself before sending it over to the encoder, or just state that Mono<List<T>> is recommended for producing non-streaming types.

Comment From: rstoyanchev

We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via Fux#collectToList() if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.

Comment From: Airidas36

We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via Fux#collectToList() if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.

But wouldn't it make sense to approach encoding based on the media type that is supposed to be produced? As a user, if my intention is to return an application/json, I would expect the Flux is collected to a List and any error signals are thrown up the stack. Streaming of course would make sense if I'm producing application/x-ndjson.

Comment From: rstoyanchev

The trade-off is additional memory to aggregate, and if we aggregate by default, there is no way to get it to behave whichever way you prefer. This way, you can choose to aggregate or not. I suppose we could make this configurable in some way, if you would be okay to switch it wholesale.