I have a problem with RSocket. I am using JDK 17 and Spring boot 3.2.x.
Sample project to reproduce the issue https://github.com/abdulrahimseera/rsocket-spring-boot-3.2.x - working perfectly fine before 3.2.0
Error stack trace
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Caused by: java.lang.IllegalStateException: Source has to be ASYNC fuseable
at io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onSubscribe(InMemoryResumableFramesStore.java:528) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.internal.UnboundedProcessor.subscribe(UnboundedProcessor.java:414) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4339) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.resume.ResumableDuplexConnection.<init>(ResumableDuplexConnection.java:83) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.ServerSetup$ResumableServerSetup.acceptRSocketSetup(ServerSetup.java:124) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.acceptSetup(RSocketServer.java:418) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.accept(RSocketServer.java:386) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.lambda$acceptor$0(RSocketServer.java:370) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:332) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Comment From: bclozel
Could you report this against the https://github.com/rsocket/rsocket-java/issues project directly? I don't think anything here is related to Spring Boot and it looks like an RSocket/Reactor problem.
Note: you can also simplify your sample as the observability code is not required at all to reproduce the problem. Also, you should let the RSocket team know that they need to run both services and then call "http://localhost:3333/hello/rsocket".
Comment From: abdulrahimseera
@bclozel Thank you very much for your quick reply. I would greatly appreciate your help.