We cannot make trivial spring webflux + RSocket routes works when using Kotlin coroutines instead of Reactor, despite a huge amount of research on blogs, documentation and even by reading the official spring coroutine extensions for RSocketRequester integration tests.
Here is the basic route:
// with reactor: (it works fine)
@MessageMapping("foo")
fun foo(): Mono<void> {
println("test")
return Mono.empty()
}
// with suspend coroutine instead, throw an exception
@MessageMapping("foo")
suspend fun foo() {
println("test")
}
We get an exception both when calling the route from Junit5 (via RSocketRequester) and from an RSocketJS client. For such a trivial example one could guess that we have an issue with our config/ dependencies ? But we have followed the instructions for autoconfiguration.
We have the following gradle dependencies related to this: implementation("org.springframework.boot:spring-boot-starter-rsocket") implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-rsocket") implementation("org.springframework.integration:spring-integration-test") testImplementation("io.kotlintest:kotlintest-runner-junit5:3.3.0") testImplementation("io.kotlintest:kotlintest-extensions-spring:3.1.10") annotationProcessor("org.springframework.boot:spring-boot-configuration-processor") annotationProcessor("org.projectlombok:lombok")
Most importantly, we have the following kotlin coroutine depenencies -> implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.5")
We have spring boot version 2.2.5 which have spring framework 5.2.4 and spring coroutine support has been added in spring framework 5.2 plugins { id("org.springframework.boot") version "2.2.5.RELEASE" id("io.spring.dependency-management") version "1.0.9.RELEASE" kotlin("jvm") version "1.3.61" kotlin("plugin.spring") version "1.3.61" id("org.flywaydb.flyway") version "6.2.1" }
We have not added any special annotation for coroutine support as we believe autoconf should work as espected.
What are we missing, and either fix this bug if it is one or improve the documentation/examples as we have read almost all of what is available online and could'nt figure out the issue.
The thrown exception is the following:
java.lang.IllegalStateException: Invocation failure
Endpoint [com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController]
Method [public java.lang.Object com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController.damn2(kotlin.coroutines.Continuation<? super kotlin.Unit>)] with argument values:
[0] [null]
at org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:153) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
Caused by: java.lang.NoClassDefFoundError: kotlin/coroutines/AbstractCoroutineContextKey
FULL LOG ->
2020-04-27 07:32:06.835 INFO 22592 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-27 07:32:07.309 INFO 22592 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-04-27 07:32:07.424 INFO 22592 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-04-27 07:32:07.424 INFO 22592 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-04-27 07:32:07.424 INFO 22592 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-04-27 07:32:07.689 INFO 22592 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 8085
2020-04-27 07:32:07.743 INFO 22592 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-04-27 07:32:07.751 INFO 22592 --- [ main] c.b.b.BrainflowServerApplicationKt : Started BrainflowServerApplicationKt in 5.471 seconds (JVM running for 6.892)
sdfhusdfuidshufh
2020-04-27 07:32:37.620 ERROR 22592 --- [or-http-epoll-2] o.s.m.h.i.reactive.InvocableHelper : No exception handling method
java.lang.IllegalStateException: Invocation failure
Endpoint [com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController]
Method [public java.lang.Object com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController.damn2(kotlin.coroutines.Continuation<? super kotlin.Unit>)] with argument values:
[0] [null]
at org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:153) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:318) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4110) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4110) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at io.rsocket.RSocketResponder.handleRequestResponse(RSocketResponder.java:387) ~[rsocket-core-1.0.0-RC6.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) ~[rsocket-core-1.0.0-RC6.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:154) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.5.RELEASE.jar:0.9.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:36) ~[rsocket-transport-netty-1.0.0-RC6.jar:na]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.46.Final.jar:4.1.46.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.46.Final.jar:4.1.46.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.46.Final.jar:4.1.46.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.lang.NoClassDefFoundError: kotlin/coroutines/AbstractCoroutineContextKey
at java.base/java.lang.ClassLoader.defineClass1(Native Method) ~[na:na]
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016) ~[na:na]
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151) ~[na:na]
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821) ~[na:na]
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719) ~[na:na]
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642) ~[na:na]
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
at kotlinx.coroutines.CoroutineDispatcher.<clinit>(CoroutineDispatcher.kt) ~[kotlinx-coroutines-core-1.3.5.jar:na]
at kotlinx.coroutines.CoroutineContextKt.createDefaultDispatcher(CoroutineContext.kt:23) ~[kotlinx-coroutines-core-1.3.5.jar:na]
at kotlinx.coroutines.Dispatchers.<clinit>(Dispatchers.kt:33) ~[kotlinx-coroutines-core-1.3.5.jar:na]
at org.springframework.core.CoroutinesUtils.invokeSuspendingFunction(CoroutinesUtils.kt:71) ~[spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:137) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
... 66 common frames omitted
Caused by: java.lang.ClassNotFoundException: kotlin.coroutines.AbstractCoroutineContextKey
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
... 80 common frames omitted
java.lang.IllegalStateException: Invocation failure
Endpoint [com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController]
Method [public java.lang.Object com.brainflow.brainflowserver.controllers.projects.realtime.ConnectionPocController.damn2(kotlin.coroutines.Continuation<? super kotlin.Unit>)] with argument values:
[0] [null]
at org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:153)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132)
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:318)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at io.rsocket.RSocketResponder.handleRequestResponse(RSocketResponder.java:387)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493)
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:154)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.lang.NoClassDefFoundError: kotlin/coroutines/AbstractCoroutineContextKey
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at kotlinx.coroutines.CoroutineDispatcher.<clinit>(CoroutineDispatcher.kt)
at kotlinx.coroutines.CoroutineContextKt.createDefaultDispatcher(CoroutineContext.kt:23)
at kotlinx.coroutines.Dispatchers.<clinit>(Dispatchers.kt:33)
at org.springframework.core.CoroutinesUtils.invokeSuspendingFunction(CoroutinesUtils.kt:71)
at org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:137)
... 66 more
Caused by: java.lang.ClassNotFoundException: kotlin.coroutines.AbstractCoroutineContextKey
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 80 more
WE really want to fix this issue, we can provide our full build.gradle.kts file, or even our full example code if necessary.
@sdeleuze I am pinging you as I know you are involved in spring rsocket coroutine support and has implemented the following pull requests: for the issue: https://github.com/spring-projects/spring-framework/issues/22780
https://github.com/sdeleuze/spring-framework/commit/842e7e5ef71e30e9ff3a37af11b68b626e7fb350
https://github.com/sdeleuze/spring-framework/commit/1ee65fa032b76bcdcf6567781d1566d9cedce69e
Thank you a lot in advance.
Comment From: LifeIsStrange
My understanding of the autoconfiguration is based on what @sdeleuze has said in its official blog -
Coroutines support is enabled when kotlinx-coroutines-core and kotlinx-coroutines-reactor dependencies are in the classpath:
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") }
https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow
Edit: missclick sorry for closing
Comment From: sdeleuze
Hey, Coroutines 1.3.5
requires Kotlin 1.3.7x
but you are using 1.3.61
, could you try with Kotlin 1.3.72
for example?
Comment From: LifeIsStrange
Thank you @sdeleuze , that was exactly the root cause ! You saved us ton of time, that was really helpful !
As a side note, we are now using Kotlin 1.72 but still using kotlin("plugin.spring") version "1.3.61" because it hasn't yet been ported to 1.3.72. Would you advise us to keep it like that or to downgrade kotlinx.coroutines ?
Anyway, this issue wasn't one (in an ideal world kotlinx.coroutines would detect the version mismatch instead of throwing a weird exception but still it was our issue)
Comment From: yschimke
Think I also hit this, thanks @rstoyanchev
https://rsocket.slack.com/archives/C9S4A2CFQ/p1600158322033600