Affects: \5.3.3 (Spring Boot 2.4.2)
Expected
RSocket channel endpoint in my Spring Boot application receives cancellation signal from the inbound, client-driven stream to allow server side cleanup, etc.
Actual
No cancellation, completion, or error signal received by my reactive stream, either Flux or Flow.
Setup
Relevant dependencies: - Spring Boot 2.4.2 - Kotlin 1.4.21 - Kotlinx Coroutines 1.4.2 - RSocket Core 1.1.0 (also tried 1.1.1-SNAPSHOT, which suppresses "Operator called default onErrorDropped" but still doesn't propagate the cancellation)
I have tried to achieve my goal with both Kotlin coroutine Flows and Reactor Flux(en?). Both client/server pairs below should do the same thing: establish an RSocket channel, send 2 "ping" payloads from the client, the server responds to each with a "pong" payload, and the client closes the connection.
Flow server side:
@MessageMapping("testFlow")
fun testPingFlow(input: Flow<String>): Flow<String> {
val cs = CoroutineScope(EmptyCoroutineContext)
val output = MutableSharedFlow<String>(10)
cs.launch {
try {
input
.catch { e ->
logger.error("Rsocket server input error", e)
}
.onCompletion { exception ->
logger.debug("Rsocket server input completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket server input flow", exception)
}
}
// Normal .collect complains about being internal-only
.collectIndexed { _, message ->
logger.debug("Rsocket server input received $message")
output.emit("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
}
return output
}
Flow client side test:
@Test
fun testPingFlow() {
val outToServer = MutableSharedFlow<String>(10)
runBlocking {
val socketFlow = rSocketRequester
.route("testFlow")
.data(outToServer.asFlux())
.retrieveFlow<String>()
.take(2)
outToServer.emit("Ping ${System.currentTimeMillis()}")
outToServer.emit("Ping ${System.currentTimeMillis()}")
socketFlow
.onCompletion { exception ->
logger.debug("Rsocket client output completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket client output flow", exception)
}
}
.collect { message ->
logger.debug("Received pong from server $message")
}
}
}
Flux server side:
@MessageMapping("testFlux")
fun testPingFlux(input: Flux<String>): Flux<String> {
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux()
}
Flux client side test:
@Test
fun testPingFlux() {
val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()
rSocketRequester
.route("testFlux")
.data(outToServer.asFlux())
.retrieveFlux<String>()
.doOnCancel {
logger.debug("Rsocket client output connection completed")
}
.doOnError { e ->
logger.error("Exception received while processing Rsocket client output flow", e)
}
.take(2)
.subscribe { message ->
logger.debug("Received pong from server $message")
}
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
}
The Problem
Both client/server snippets above do in fact send ping/pong payloads back and forth, but in each case I get no handling on the server side of the client cancelling the connection. I get my own log line of Rsocket client output completed
from the client side, then Operator called default onErrorDropped
from Reactor and the following stack trace from RSocket:
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
This is a problem as (beyond this toy example) my application needs to do server-side cleanup on connection close.
Things I Have Tried
- All the various methods to catch exceptions, cancellation, or completion on Flows or Fluxen, many of which are illustrated in the example above.
- try/catch blocks in the subscribe/collect lambdas.
- Coupling the server response Flux/Flow directly to the input Flux/Flow via a map operator rather than creating a separate output Flux/Flow.
- Stepping through framework code in the debugger, which I am not ashamed to say lost me pretty quickly. My best theory from this adventure is that the Flux/Flow that receives the cancellation signal is somehow decoupled from the input Flux/Flow that my server method receives, but there are too many layers of abstraction for me to trace it.
- I asked about this on Stackoverflow and was told this is 'a bug in the reactor
switchOnFirst
operator which does not propagate an inbound error if outbound has been canceled' and was directed to report a bug here, so here we are. I kind of wonder if this is a Project Reactor bug becauseswitchOnFirst
is part of that project, but I was directed here by Oleh Dokuka, who wrote that class, so I assume he knows what he's talking about.
Comment From: rstoyanchev
This seems to be related to the switchOnFirst
operator. I've created https://github.com/reactor/reactor-core/issues/2598 to see if there is any improvement to be had from the Reactor side. In the mean time you can still detect the cancellation on the response Flux
and propagate to the request Flux
:
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
val disposable = input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux().doFinally {
disposable.dispose()
}
}
Comment From: rstoyanchev
@dvankley https://github.com/reactor/reactor-core/issues/2598 has been fixed. You can try removing the above workaround and use 2020.0.7-SNAPSHOT
to test with.
Comment From: dvankley
@rstoyanchev thanks for the follow up. I tried both my example tests above with 2020.0.7-SNAPSHOT
and got the following:
My own "Rsocket server input connection error" log
One for each client-sent message:
java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.cancelAndError(FluxSwitchOnFirst.java:608) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstControlSubscriber.cancel(FluxSwitchOnFirst.java:926) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:384) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560) ~[reactor-netty-http-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:168) ~[reactor-netty-http-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.0.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
followed by this, same as before (one for each client-sent message):
2021-04-27 21:18:03.503 ERROR 98539 --- [ctor-http-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
followed by one of these at the end:
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560) ~[reactor-netty-http-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:168) ~[reactor-netty-http-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7-20210427.092834-20.jar:1.0.7-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.0.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Seems the reactor fix didn't resolve this? Or maybe I didn't import the snapshot JARs correctly (although it looks like I did)?
Comment From: rstoyanchev
Thanks for giving it a try before the release. I think based on the fix, a CancellationException
is supposed to be sent to the input
stream in case it is subscribed to independently. However, I'm not sure if the output you're getting indicates that or not. It seems like it might not be.
Can you confirm that you removed the workaround I suggested with output.asFlux().doFinally { disposable.dispose() }
and likewise do you have any indication if the input Flow
is terminated via error? That would be the "Rsocket server input connection error" from your sample code.
/cc @OlegDokuka
Comment From: dvankley
Can you confirm that you removed the workaround I suggested with output.asFlux().doFinally { disposable.dispose() }
Confirmed. Client side (same as before plus a sleep to keep it from terminating early, which is probably the wrong way to do it but whatever):
@Test
fun testPingFlux() {
val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()
rSocketRequester
.route("testFlux")
.data(outToServer.asFlux())
.retrieveFlux<String>()
.doOnCancel {
logger.debug("Rsocket client output connection completed")
}
.doOnError { e ->
logger.error("Exception received while processing Rsocket client output flow", e)
}
.take(2)
.subscribe { message ->
logger.debug("Received pong from server $message")
}
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
Thread.sleep(5000)
}
Server side (same as before, no workaround):
@MessageMapping("testFlux")
fun testPingFlux(input: Flux<String>): Flux<String> {
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux()
}
do you have any indication if the input Flow is terminated via error? That would be the "Rsocket server input connection error" from your sample code.
Yes.
Relevant output from my test run (`GameController` is the name of the class the server test class):
2021-04-29 21:41:02.418 DEBUG 77111 --- [ctor-http-nio-2] o.s.m.rsocket.DefaultMetadataExtractor : Values extracted from metadata: {} with registrations for [].
2021-04-29 21:41:02.425 DEBUG 77111 --- [ctor-http-nio-2] o.s.m.rsocket.DefaultMetadataExtractor : Values extracted from metadata: {route=testFlux} with registrations for [].
2021-04-29 21:41:02.431 DEBUG 77111 --- [ctor-http-nio-2] n.d.f.web.controllers.GameController : Rsocket server input message received Ping 1619746862245
2021-04-29 21:41:02.437 DEBUG 77111 --- [actor-tcp-nio-2] n.d.f.web.GameControllerSocketPingTest : Received pong from server pong 1619746862431
2021-04-29 21:41:02.437 DEBUG 77111 --- [ctor-http-nio-2] n.d.f.web.controllers.GameController : Rsocket server input message received Ping 1619746862245
2021-04-29 21:41:02.438 DEBUG 77111 --- [actor-tcp-nio-2] n.d.f.web.GameControllerSocketPingTest : Received pong from server pong 1619746862437
2021-04-29 21:41:02.438 DEBUG 77111 --- [actor-tcp-nio-2] n.d.f.web.GameControllerSocketPingTest : Rsocket client output connection completed
2021-04-29 21:41:02.444 ERROR 77111 --- [ctor-http-nio-2] n.d.f.web.controllers.GameController : Rsocket server input connection error
java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.cancelAndError(FluxSwitchOnFirst.java:608) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstControlSubscriber.cancel(FluxSwitchOnFirst.java:926) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:384) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:168) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.0.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2021-04-29 21:41:02.444 ERROR 77111 --- [ctor-http-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
Caused by: java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.cancelAndError(FluxSwitchOnFirst.java:608) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstControlSubscriber.cancel(FluxSwitchOnFirst.java:926) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:384) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:168) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.0.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2021-04-29 21:41:02.445 ERROR 77111 --- [ctor-http-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6-20210426.131931-6.jar:3.4.6-SNAPSHOT]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:168) ~[reactor-netty-http-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7-20210429.072500-22.jar:1.0.7-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.0.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2021-04-29 21:41:09.386 INFO 77111 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
Comment From: dvankley
@rstoyanchev just circling back on this. With reactor-core:3.4.10
and rsocket-core:1.1.1
I'm still seeing the same problem as described in the previous message. In summary, at this point I do get exception events that I can handle server side to perform cleanup so functionally I'm fine, but I continue to see the following errors, which muck up my logs and undermine my confidence:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
Caused by: java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.cancelAndError(FluxSwitchOnFirst.java:608) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstControlSubscriber.cancel(FluxSwitchOnFirst.java:926) ~[reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:385) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:346) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:226) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.10.jar:3.4.10]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.5.2.jar:5.5.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:579) ~[reactor-netty-http-1.0.11.jar:1.0.11]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:167) ~[reactor-netty-http-1.0.11.jar:1.0.11]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.68.Final.jar:4.1.68.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2021-11-06 16:01:22.317 ERROR 61070 --- [ctor-http-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$new$0(WebsocketDuplexConnection.java:54) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1182) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:773) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:749) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:769) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:526) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2021-11-06 16:01:22.318 ERROR 61070 --- [actor-tcp-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped (this isn't a typo, I get this error twice)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$new$0(WebsocketDuplexConnection.java:54) ~[rsocket-transport-netty-1.1.1.jar:na]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1182) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:773) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:749) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thanks.