The following code with suspend function throws Missing 'rsocketResponse' exception. println prints data to the console as expected. It works only when the method is rewritten with Mono: fun enter(data: Any): Mono<Void>. The same issue is reproduced for methods annotated with @ConnectMapping.

Reproduced with Spring Boot 2.2.0.RELEASE and 2.2.1.SNAPSHOT.

@Controller
class Api {
    @MessageMapping("test")
    suspend fun enter(data: Any) {
        println(data)
    }
}

Stacktrace:

java.lang.IllegalArgumentException: Missing 'rsocketResponse'
    at org.springframework.util.Assert.notNull(Assert.java:198)
    at org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler.handleEncodedContent(RSocketPayloadReturnValueHandler.java:65)
    at org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler.lambda$handleReturnValue$0(AbstractEncoderMethodReturnValueHandler.java:124)
    at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator$WriteBarrier.onComplete(ChannelSendOperator.java:251)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1820)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:129)
    at kotlinx.coroutines.reactor.MonoCoroutine.onCompleted(Mono.kt:71)
    at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:102)
    at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:275)
    at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:807)
    at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:787)
    at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
    at kotlinx.coroutines.DispatchedKt.resumeCancellable(Dispatched.kt:457)
    at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
    at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:109)
    at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:154)
    at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt:60)
    at kotlinx.coroutines.reactor.MonoKt$monoInternal$1.accept(Mono.kt)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:53)
    at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:85)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2148)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132)
    at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:318)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
    at io.rsocket.RSocketResponder.handleFireAndForget(RSocketResponder.java:366)
    at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:296)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:457)
    at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:153)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:830)

Comment From: sdeleuze

I can't reproduce it, and we have tests for that use case in https://github.com/spring-projects/spring-framework/blob/master/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt#L139-L142, could you please provide a repro project?

Comment From: denyshorman

@sdeleuze, thanks for the reply! I have prepared backend and UI projects where you can easily reproduce the issue.

Steps to reproduce: 1. Unzip rsocket-issue.zip 2. Open backend in Intellij and run it 3. Open ui project in Intellij, install suggested dependencies and run "Angular CLI Server". 4. Go to http://localhost:4200 in browser and it will automatically send fireAndForget request to the backend. 5. On the server side you should see message from the UI with Missing 'rsocketResponse' exception.

Please let me know if you need additional details. Thanks

Comment From: rstoyanchev

@sdeleuze, I have not tried the code but it sounds like this here is actually seeing a return value when suspend is used. The fact that it works with Mono<Void> shows that it works otherwise. For a fire and forget interaction the @MessageMapping method is not expected to return any values.

We could raise a better message in the very least. There is a header that indicates the RSocket frame type and based on that we could reject fireAndForget or setup frame handling methods with a return value.

Comment From: sdeleuze

I guess what probably happens is that without specific Coroutines handling a suspending method that returns no value will at bytecode level return an Object and get an additional Continuation parameter. We may have to refine our support to handle this case (and indeed improve the messages).

Comment From: MichalJana

Hi I have similar bug here SpringBoot RSockets Kotlin - Missing 'rsocketResponse'

Comment From: sdeleuze

@MichalJana Please create a new issue with a project that allow to reproduce this issue.