env: k8s Spring-Boot Version:2.6.3 Spring-cloud:3.1.0 java:11 webflux

it see like to set cookie when appear this error:

The example provides an interface request, which can be reproduced 100% as long as the request with a cookie

sample project: SampleProject.zip

Error finishing response. Closing connection

java.lang.IllegalStateException: Status and headers already sent
    at reactor.netty.http.server.HttpServerOperations.addHeader(HttpServerOperations.java:243)
    at org.springframework.http.server.reactive.ReactorServerHttpResponse.applyCookies(ReactorServerHttpResponse.java:110)
    at org.springframework.http.server.reactive.AbstractServerHttpResponse.lambda$doCommit$14(AbstractServerHttpResponse.java:292)
    at reactor.core.publisher.MonoRunnable.subscribe(MonoRunnable.java:49)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
    at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:81)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onComplete(FluxDoOnEach.java:223)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace$WebFilterTraceSubscriber.onComplete(TraceWebFilter.java:281)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:234)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:234)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:234)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorSubscriber.hookOnComplete(SentinelReactorSubscriber.java:136)
    at com.alibaba.csp.sentinel.adapter.reactor.InheritableBaseSubscriber.onComplete(InheritableBaseSubscriber.java:191)
    at com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorSubscriber.onComplete(SentinelReactorSubscriber.java:37)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:234)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:103)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:196)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:268)
    at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
    at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
    at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
    at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
    at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
    at io.netty.channel.epoll.AbstractEpollChannel.doWriteBytes(AbstractEpollChannel.java:364)
    at io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytes(AbstractEpollStreamChannel.java:260)
    at io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteSingle(AbstractEpollStreamChannel.java:471)
    at io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:429)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
    at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.flush0(AbstractEpollChannel.java:557)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
    at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
    at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829) 

Comment From: poutsma

If you'd like us to spend some time investigating, please take the time to provide a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem.

Comment From: fanpocha

@poutsma sample had uploaded

Comment From: poutsma

Sorry, I somehow looked over that.

Comment From: fanpocha

@poutsma 1、Start the sample project SampleProject.zip 2、Call the interface according to the following figure, and the problem will appear Spring Status and headers already sent

Comment From: poutsma

What exactly are you trying to accomplish in your filter? It is not clear to me.

The code does reveal some major issues that should be fixed before we can proceed.

  • You're subclassing ReactorServerHttpResponse. That class is an internal class, not to be directly used by anyone else but Spring Framework. That is why it is package protected, which means that you only have acces to it from the same package. As a Spring user, you get an instance of this class through the ReactorHttpHandlerAdapter, but you're never going to instantiate a concrete instance. You "solved" the issue by putting your code in the org.springframework.http.server.reactive package. Don't do that; use the public interfaces and classes instead.
  • You obtain the WebSession by calling exchange.getSession().toFuture().join() which blocks the thread. Don't block in reactive web filter; you want to compose on the result. It doesn't look like you actually use the session later on in the code, but the blocking has already been done at that point.
  • You convert body buffers into strings using UTF-8 using a ByteArrayOutputStream.
  • You can obtain a String using DataBuffer.toString(Charset), no need for the the output stream.
  • You should not assume that request data is UTF-8, you should look at the request Content-Type to see what the actual charset is is.
  • More importantly, converting body byte chunks into UTF-8 will fail when a non-ASCII, multi-byte character is split across multiple buffers. If you want to convert to a String, join all buffers into one using DataBufferUtils.join, and convert that to a string. Or use Spring's StringDecoder, which breaks on newline characters.

What I think you're trying to do is invoke custom business logic whenever the response gets committed. You can do that by decorating the whole response with the ServerHttpRequestDecorator, override getBody, and compose on the result of the super class, possible by calling Flux.then. Read the Reactor Reference Guide for more information.

Comment From: fanpocha

@poutsma @rstoyanchev This problem is really caused by getting the session. I need to get user information from the session. Is there any other scheme to obtain session

Comment From: fanpocha

@poutsma Here, you must use the class reactorserverhttppresponse, because the zerocopyhttpoutputmessage implementation class used in the interface is used to download files. If decorative classes are used, class conversion exceptions will occur

Comment From: rstoyanchev

@fanpocha, using .join() or .get() on a Future defeats the purpose of asynchronous handling, and in production you'll be out of event loop threads in no time. You'll need to learn a bit of Reactor, but conceptually it's no different than composing on CompletableFuture or on java.util.Stream. You don't exit the chain to access a value. You compose within it.

As for the rest, package protected classes are not meant for use by applications. You've got to look for options with the available public API, or otherwise you are crossing boundaries that aren't meant to be crossed, which is your choice but not something we are going to support.

Comment From: fanpocha

@rstoyanchev 1、I changed the way of writing about the session problem, which should not exist Spring Status and headers already sent

2、As for the reactorserverhtpresponse class, I made it clear for the previous reply that the default decorative class serverhttpresponsedecorator cannot meet the requirements because of functional requirements

3、And I think this is really a bug and should not be repeated,should be fix

Comment From: fanpocha

Closing the original link can solve the problem