In spring-messaging-5.2.5-RELEASE, I observed in the debugger that to successfully complete the connection setup after intercepting and caching a Server Requester, the Kotlin @ConnectMapping method must declare a return type of Monoif block:
if (returnValue == null) {
return handleNoContent(returnType, message);
}
and that resulted in a failure to establish the connection with a RejectedSetupException.
Comment From: pivotal-issuemaster
@richmeyer7 Please sign the Contributor License Agreement!
Click here to manually synchronize the status of this Pull Request.
See the FAQ for frequently asked questions.
Comment From: pivotal-issuemaster
@richmeyer7 Thank you for signing the Contributor License Agreement!
Comment From: rstoyanchev
From the reference docs:
Keep in mind that
@ConnectMappingmethods are essentially handlers of the SETUP frame which must be handled before requests can begin. Therefore, requests at the very start must be decoupled from handling.
In other words the completion of a @ConnectMapping method cannot be tied to the start of a request. Please, provide details on what does your method look like.
Comment From: richmeyer7
Thank you, Rossen.
My method is below. It does not start a request. It merely caches the RSocketRequester and returns. I checked that the RSocketRequester did not get used immediately upon entering the cache. I found that returning any value other than null caused the connection setup to fail. Also, declaring the method to return Unit (void) and not providing a return value caused the connection setup to fail. I stepped through the Spring handling in the debugger for various return types and values, and found that the only successful path was to return null so that method AbstractEncoderMethodReturnValueHandler.handleReturnValue(...) executes its "if (returnValue == null)" block and returns, not executing the remainder of the method. This may turn out to be unintentional behavior in AbstractEncoderMethodReturnValueHandler.handleReturnValue(...) and become a bug instead of a documentation issue... if I have not misunderstood something.
private val backendServiceRsocketRequesterReplayProcessors: ConcurrentHashMap<String, ReplayProcessor<RSocketRequester>> =ConcurrentHashMap()
@ConnectMapping("ApiService.registerBackendService") @MessageMapping("ApiService.registerBackendService") suspend fun registerBackendService(@Payload name: String,rSocketRequester: RSocketRequester): Mono
? { backendServiceRSocketRequesterReplayProcessors.computeIfAbsent(name) { ReplayProcessor.cacheLast() }
.onNext(rSocketRequester) return null // This is the only value I found that doesn't triggerRejectedSetupException.
}
- Rich
On Mon, May 11, 2020 at 11:24 PM Rossen Stoyanchev notifications@github.com wrote:
From the reference docs:
Keep in mind that @ConnectMapping methods are essentially handlers of the SETUP frame which must be handled before requests can begin. Therefore, requests at the very start must be decoupled from handling.
In other words the completion of a @ConnectMapping method cannot be tied to the start of a request. Please, provide details on what does your method look like.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/spring-projects/spring-framework/pull/25032#issuecomment-627136584, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC2Z4LYS4F5GZITQUA3X2U3RRDTS5ANCNFSM4M3SU2EQ .
Comment From: rstoyanchev
@richmeyer7, replying through email doesn't support markdown and doesn't work very well for code samples (see above).
I'm pasting your sample here as best as I could extract it.
private val backendServiceRsocketRequesterReplayProcessors:
ConcurrentHashMap<String, ReplayProcessor<RSocketRequester> = ConcurrentHashMap()
@ConnectMapping("ApiService.registerBackendService")
@MessageMapping("ApiService.registerBackendService")
suspend fun registerBackendService(@Payload name: String, rSocketRequester: RSocketRequester): Mono<Void> {
backendServiceRSocketRequesterReplayProcessors
.computeIfAbsent(name) { ReplayProcessor.cacheLast() }
.onNext(rSocketRequester)
return null // This is the only value I found that doesn't trigger RejectedSetupException.
}
I don't know if having both @ConnectMapping and @MessageMapping is intentional. Are you intercepting both connection opening and a request with the same method?
That aside the method works without a return value if suspend is removed. Do you really mean for this to use coroutines here?
Comment From: sdeleuze
If you want to translate a Reactive method that returns Mono<Void> to Coroutines, you should use suspend fun registerBackendService(@Payload name: String, rSocketRequester: RSocketRequester) { ... }, not suspend fun registerBackendService(@Payload name: String, rSocketRequester: RSocketRequester): Mono<Void> { ... } which is conceptually a Mono<Mono<Void>> Reactive return value.
Comment From: richmeyer7
@rstoyanchev -
Thank you for extracting my sample, Rossen. I inadvertently replied through email.
I see only one discrepancy, that my sample return type was actually Mono<Void>? (nullable).
I had the @MessageMapping because I had multiple connecting services, and needed each connecting service to provide a name that my code used to keep track of which cached RSocketRequester was connected to which service. In case that is a problem, I have now removed @MessageMapping. My architecture has changed to a single connecting service.
I used a suspend fun because that is what the Kotlin example shows in the referenced documentation.
I retested without the suspend and with no return type, and the connection now succeeds. Here is the successful method:
@ConnectMapping()
fun registerApiService(rSocketRequester: RSocketRequester) {
apiServiceRSocketRequesterReplayProcessor.onNext(rSocketRequester)
GlobalScope.launch {
// TODO: Send all faults (non-periodic state) to the ApiService.
}
}
With suspend and no return type as shown in the existing documentation, the connection always fails with RejectedSetupException: Missing 'rsocketResponse'. Here is the failing method:
@ConnectMapping() // Always fails with "suspend":
suspend fun registerApiService(rSocketRequester: RSocketRequester) {
apiServiceRSocketRequesterReplayProcessor.onNext(rSocketRequester)
GlobalScope.launch {
// TODO: Send all faults (non-periodic state) to the ApiService.
}
}
Perhaps instead of my requested documentation change, the suspend should be removed from the Kotlin sample in the documentation? The sample does not work for me as published.
@sdeleuze -
Thank you, Sébastien. I did not actually need to return a value, except I found that my connection always failed with suspend if I did not declare a return type.
My connection now succeeds without suspend and without a return type.
Comment From: rstoyanchev
@richmeyer7 I'm sorry but I can't reproduce the issue. Please provide a sample if you'd like us to continue to look into this.