Hey everyone, I am facing a strange problem, cannot figure out what is going on. It is related to RSocket and WebClient together. So what I am trying to achieve is, Creating a stream of data from retrieving the data via RSocket and stream that data to an endpoint via WebClient. So This is my code.
Spring Boot: 2.4.1 Dependency Management: 1.0.10.RELEASE Spring Cloud Dependencies: 2020.0.0
RSocket:
@MessageMapping(Api.Message.UPLOAD_FILE)
fun uploadFile(
@Header(Api.Header.MIME_FILE_NAME)
fileName: String,
@Validated
@Payload
data: Flux<DataBuffer>
) = service.upload(fileName, data, "test")
WebClient:
fun upload(fileName: String, data: Flux<DataBuffer>, token: String? = null) =
webClient
.post()
.uri(
UriComponentsBuilder
.fromUriString("https://something.com")
.queryParam("filename", fileName)
.also { builder -> token?.let { value -> builder.queryParam(token, value) } }
.build(true)
.toUri()
)
.header(HttpHeaders.AUTHORIZATION, "Bearer Test")
.contentType(MediaType.asMediaType(MimeType.valueOf("application/binary")))
.body(data, DataBuffer::class.java)
.exchangeToMono { response ->
if (response.statusCode().isError) {
handleResponseError(response)
} else {
response.bodyToMono()
}
}
But the problem is, it seems when I use the Publisher that came from RSocket, the body that wants to send to WebClient will be empty and zero-length and I get the error below:
org.springframework.web.reactive.function.client.WebClientRequestException: refCnt: 0; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0
cause -> IllegalReferenceCountException: refCnt: 0
But if I change the body to any other Publisher, it works with no problem
So changing this:
.body(data, DataBuffer::class.java)
To this or anything else:
.body(Flux.just("Test","Test2"), String::class.java)
Comment From: ghahramani
I found a strange thing, when the rsocket security gets involved I get that error, but if I remove the RSocket dependencies and remove the code below, it started working ( I have created the issue in spring security repo as well with more detail, https://github.com/spring-projects/spring-security/issues/9345)
@EnableRSocketSecurity
class RSocketSecurityConfiguration {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Bean
fun messageHandler(strategies: RSocketStrategies): RSocketMessageHandler {
val handler = RSocketMessageHandler()
handler.argumentResolverConfigurer.addCustomResolver(AuthenticationPrincipalArgumentResolver())
handler.rSocketStrategies = strategies
logger.info("Configuring RSocket handler")
return handler
}
@Bean
fun authorization(
security: RSocketSecurity,
authenticationManager: ReactiveAuthenticationManager
): PayloadSocketAcceptorInterceptor = security
.authorizePayload { spec: AuthorizePayloadsSpec ->
logger.info("Configuring RSocket authorization and jwt")
spec
.route("api.**").authenticated()
.anyExchange().permitAll()
}
.jwt { customizer -> customizer.authenticationManager(authenticationManager) }
.build()
}
Comment From: wilkinsona
Thanks for the report and for narrowing down the source of the problem. I've subscribed to the Spring Security issue which I think should supersede this one. If the resolution of the Security issue requires changes in Boot, we can re-open this issue.