Hey everyone, I am facing a strange problem, cannot figure out what is going on. It is related to RSocket and WebClient together. So what I am trying to achieve is, Creating a stream of data from retrieving the data via RSocket and stream that data to an endpoint via WebClient. So This is my code.

Spring Boot: 2.4.1 Dependency Management: 1.0.10.RELEASE Spring Cloud Dependencies: 2020.0.0

RSocket:

@MessageMapping(Api.Message.UPLOAD_FILE)
    fun uploadFile(
        @Header(Api.Header.MIME_FILE_NAME)
        fileName: String,

        @Validated
        @Payload
        data: Flux<DataBuffer>
    ) = service.upload(fileName, data, "test")

WebClient:

fun upload(fileName: String, data: Flux<DataBuffer>, token: String? = null) =
        webClient
            .post()
            .uri(
                UriComponentsBuilder
                    .fromUriString("https://something.com")
                    .queryParam("filename", fileName)
                    .also { builder -> token?.let { value -> builder.queryParam(token, value) } }
                    .build(true)
                    .toUri()
            )
            .header(HttpHeaders.AUTHORIZATION, "Bearer Test")
            .contentType(MediaType.asMediaType(MimeType.valueOf("application/binary")))
            .body(data, DataBuffer::class.java)
            .exchangeToMono { response ->
                if (response.statusCode().isError) {
                    handleResponseError(response)
                } else {
                    response.bodyToMono()
                }
            }

But the problem is, it seems when I use the Publisher that came from RSocket, the body that wants to send to WebClient will be empty and zero-length and I get the error below:

org.springframework.web.reactive.function.client.WebClientRequestException: refCnt: 0; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0
cause -> IllegalReferenceCountException: refCnt: 0

But if I change the body to any other Publisher, it works with no problem

So changing this:

.body(data, DataBuffer::class.java)

To this or anything else:

.body(Flux.just("Test","Test2"), String::class.java)

Comment From: ghahramani

I found a strange thing, when the rsocket security gets involved I get that error, but if I remove the RSocket dependencies and remove the code below, it started working ( I have created the issue in spring security repo as well with more detail, https://github.com/spring-projects/spring-security/issues/9345)

@EnableRSocketSecurity
class RSocketSecurityConfiguration {

    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun messageHandler(strategies: RSocketStrategies): RSocketMessageHandler {
        val handler = RSocketMessageHandler()
        handler.argumentResolverConfigurer.addCustomResolver(AuthenticationPrincipalArgumentResolver())
        handler.rSocketStrategies = strategies
        logger.info("Configuring RSocket handler")
        return handler
    }

    @Bean
    fun authorization(
        security: RSocketSecurity,
        authenticationManager: ReactiveAuthenticationManager
    ): PayloadSocketAcceptorInterceptor = security
        .authorizePayload { spec: AuthorizePayloadsSpec ->
            logger.info("Configuring RSocket authorization and jwt")
            spec
                .route("api.**").authenticated()
                .anyExchange().permitAll()
        }
        .jwt { customizer -> customizer.authenticationManager(authenticationManager) }
        .build()

}

Comment From: wilkinsona

Thanks for the report and for narrowing down the source of the problem. I've subscribed to the Spring Security issue which I think should supersede this one. If the resolution of the Security issue requires changes in Boot, we can re-open this issue.