Kacper Klepacz opened SPR-16688 and commented

Cancelling SSE subscription on client side (e.g.: when u are not interested in it anymore) via Disposable.dispose() is causing following error on server side:

2018-04-10 15:17:57.402 ERROR 8220 --- [ctor-http-nio-4] o.s.w.s.adapter.HttpWebHandlerAdapter    : Unhandled failure: An established connection was aborted by the software in your host machine, response already set (status=200)
2018-04-10 15:17:57.411 ERROR 8220 --- [ctor-http-nio-4] o.s.h.s.r.ReactorHttpHandlerAdapter      : Handling completed with error

java.io.IOException: An established connection was aborted by the software in your host machine
    at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_161]
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_161]
    at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_161]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_161]
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:38) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

2018-04-10 15:18:34.700  INFO 8220 --- [pool-1-thread-1] com.example.SampleEmitter                : Added a2eae85c-ee7b-4c1a-a65a-bdecdb34c4d7
2018-04-10 15:18:34.710 ERROR 8220 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations    : [HttpServer] Error processing connection. Requesting close the channel

java.io.IOException: An established connection was aborted by the software in your host machine
    at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_161]
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_161]
    at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_161]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_161]
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:6873) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) [netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) [netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.22.Final.jar:4.1.22.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

Client example:

            Disposable disposable = WebClient.create("http://localhost:8080")
                    .get()
                    .uri("/objects")
                    .retrieve()
                    .bodyToFlux(MyObject.class)
                    .subscribe(n -> log.info("Next: {}", n.getId()),
                            e -> log.error("Error: {}", e),
                            () -> log.info("Completed"));

                //I want to gracefully cancel subscription when I'm not interested in it anymore (for sake of this example - after 10 seconds)
                Thread.sleep(10000);
                disposable.dispose();

Full, ready-to-run example available here: https://github.com/kklepacz/webflux-cancel-subscription


Affects: 5.0.4

Attachments: - tomcat_windows.txt (18.96 kB) - undertow_windows.txt (7.75 kB)

Issue Links: - #22189 Improve error handling in reactive SSE support when the remote channel is closed

Referenced from: commits https://github.com/spring-projects/spring-framework/commit/75a41db071a4fb0a07bf95e4db48c8e93c9dd2d0, https://github.com/spring-projects/spring-framework/commit/7aba6ca9d661399c983890f0134a37f40b8e835c

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Can you clarify "graceful unsubscribe", what is your expectation?

Comment From: spring-projects-issues

Kacper Klepacz commented

No stacktrace with error, for sure because it is not an error situation for me. Expected behavior would be no message on server side at all or just some basic info that someone unsubscribed. This stacktrace occurs also when someone will "kill" connection between server and client (e.g.: turn off client application without unsubscribing). For me, 2 following situations are very different and should be handled differently - please correct me if I'm wrong: 1. If someone didn't unsubscribe, just kill app or connection went down - server realise that and throws error that something unexpected happened and established connection was aborted. 2. if someone intentionally call dispose() - server treats it as a normal situation (do some cleaning if needed - I don't know details).

For me calling dispose() is similar to unsubscribe() in RxJS and RxJS Subscription is close to Disposable in Reactor. In Angular 5 It is a good practice to call subscription.unsubscribe() on onDestroy component lifecycle. So calling dispose() or unsubscribe() is a common scenario for me and should not cause stacktrace print with info about connection aborted. Of course I have tried an Angular 5 client with unsubscribe() calls - result is unfortunatelly the same.

Comment From: spring-projects-issues

Sébastien Deleuze commented

I can't reproduce the issue on my Linux laptop. What is your OS and Java version? Do you always have this stacktrace on server-side when the client calls dispose()?

Comment From: spring-projects-issues

Kacper Klepacz commented

Unfortunately every time I call dispose() I receive mentioned error. Of course calling dispose() next time will have no effect and no stacktrace as result because I'm not subscribed anymore. Java version: 1.8.0_161 (tested also on builds 151 and 144) OS: Windows 7 SP1 Build 7601

After your comment I've tested my code on macOS (java 1.8.0_151) and there was no error also... So it looks like integration with Windows is a failure point here. I will test it on windows 10 later today.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Maybe on Linux/Unix the error message contains "broken pipe"?

Comment From: spring-projects-issues

Sébastien Deleuze commented

Good point, I have just checked it is the case.

Kacper Klepacz Any chance you could check with other engines (Undertow, Tomcat, Jetty) under Windows to see what is the exception (including the message) you get in such use case in order to give us the relevant information to eventually fine tune logging?

Comment From: spring-projects-issues

Kacper Klepacz commented

First of all I confirm that I've occured the same behavior on Windows 10.

Sebastien, I assumed that I should use Undertow/Tomcat/Jetty just for server application (client's container does not matter). You can check updated pom.xml's on Github. If so, these are the results: 1. Netty, Undertow, Tomcat - message, exception and -stacktrace- is exactly the same as in original post 2. Jetty - no stacktrace at all :) I checked method mentioned by Rossen in debug mode and EofException is thrown while using Jetty on Windows. So isDisconnectedClientError method covers Jetty on Windows properly.

Comment From: spring-projects-issues

Kacper Klepacz commented

I believe there can be one more thing worth to notice. You probably should not code against message itself in this case because for example on my private laptop Throwable that comes in as a parameter of isDisconnectedClientError(Throwable ex) contains localized message like: java.io.IOException: Nawiązane połączenie zostało przerwane przez oprogramowanie zainstalowane w komputerze-hoście which is exactly java.io.IOException: An established connection was aborted by the software in your host machine but in Polish in my case because of (PowerShell Get-Culture command results):

LCID             Name           DisplayName
----             ----           -----------
1045             pl-PL          Polski (Polska)

Comment From: spring-projects-issues

Sébastien Deleuze commented

Thanks for this detailed feedback. Given the generic nature of the exception and the message that could vary I am not sure if we can do something that would cover all engines, but we can probably at least improve things with Reactor Netty.

Possible improvements I have in mind: - Add Reactor Netty AbortedException to HttpWebHandlerAdapter#DISCONNECTED_CLIENT_EXCEPTIONS - Update Reactor Netty to follow WebFlux logging strategy, for example ChannelOperations#discreteRemoteClose could log stacktrace only at TRACE level, not at DEBUG nor ERROR ones since this provide very little added value and these errors are usual - Make sure Reactor Netty logs or emits the error, not both, to avoid double logging issues - Wrap the IOException emitted in ChannelOperations#discreteRemoteClose into a more specific one (maybe AbortedException?) to be able to filter it on Spring WebFlux side.

Kacper Klepacz Your log output seems to indicate that the error is logged at least by Reactor Netty itself via ChannelOperations#discreteRemoteClose, but I am not sure if it is logged a second time by WebFlux, could you please check ?

Comment From: spring-projects-issues

Kacper Klepacz commented

My bad, I skipped 2 very first lines of logs somehow when pasting it into original JIRA. Now you should be able to see full message. So, first one is coming from: ReactorHttpHandlerAdapter and 2nd one is coming from ChannelOperations as you said.

Comment From: spring-projects-issues

Sébastien Deleuze commented

OK thanks so indeed we have a double logging issue as expected.

Any thoughts Rossen Stoyanchev?

Comment From: spring-projects-issues

Kacper Klepacz commented

After your comment about a double logging I realised that I was not careful enough while checking those logs. Even if the message is the same, Undertow and Tomcat produces different outputs. Undertow logs stacktrace just once. Tomcat - 2 times and one additional Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception [Write publisher error] with root cause. Logs in attachments

Comment From: spring-projects-issues

Rossen Stoyanchev commented

There are 3 places in o.s.http.server.reactive that do logger.error that we should change to logger.warning.

Beyond that, if we can get Reactor Netty to signal a more specific exception, that would help with detecting disconnected clients more reliably. The same is true for Tomcat and Undertow to ask them to raise some more specific exceptions if feasible. For Tomcat, I'm not sure if this comment means the behavior changed at some point? For Undertow it seems we always relied but that's apparently not reliable as this ticket shows.

Comment From: spring-projects-issues

Kacper Klepacz commented

I believe this is a matter of Tomcat/Undertow/Netty integration with Windows. I just run my example with Tomcat and Undertow on RedHat Linux server. There are no errors/stacktrace at all (Netty correct behavior on Linux/Unix was already confirmed by Sebastien). I believe that the best solution would be to have more consistent error signals for Tomcat/Undertow/Netty across different platforms.

Comment From: spring-projects-issues

Sébastien Deleuze commented

We have done various log fine tuning to make logging more relevant. - Reactor Netty does not log errors in addition to throwing the exception, see reactor-netty#339 - WebFlux server now uses warning error log level instead of error - Only the message is printed in the log, not the stacktrace which is mostly useless in reactive world

For now I don't filter "established connection was aborted" message because it won't be relevant asap not on english locales, and I think the other improvements make this single warning message much more acceptable than 2 verbose stacktraces.

Comment From: wangxfholly

what release version did it fixed