• Affects: 6.1.1 (Spring Web)

Context

I created an ExchangeFilterFunction that exported response to a bucket for some paths. Tested it and everything worked fine using it directly customizing a WebClient. When I did a E2E test, using my annotation based route, I got io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1. I don't understand what is going on.

Workaround

Using a functional route, it works. After several hours of testing, I tried it just for the sake of it.

I created a minimal reproduction repository. In this repo, I just intercept response and write it to a temp file. I don't clean that file, just saying.

To use reproduction repo, just use ./gradlew test after starting json server with docker-compose up -d json-server.

There are three tests: 1. One using directly customized WebClient. It works 2. One calls an annotation based route and then uses customized WebClient. It fails. 3. One calls a functional route and then uses customized WebClient. It works.

Comment From: rstoyanchev

The exception means that buffers are read more than once. I couldn't work out the underlying reason, but there is support in DataBufferUtils for writing to a file so there is no need to deal with InputStream and OutputStream.

To make all tests pass, I replaced this:

@Component
class Customizer : WebClientCustomizer {
  private val bufferFactory = DefaultDataBufferFactory()
  private val log = LoggerFactory.getLogger(javaClass)

  override fun customize(webClientBuilder: WebClient.Builder) {
    webClientBuilder.filter(
        ExchangeFilterFunction.ofResponseProcessor {
          Mono.just(it.mutate().body(::writeToTmpFile).build())
        },
    )
  }

  fun writeToTmpFile(oldData: Flux<DataBuffer>): Flux<DataBuffer> =
      Mono.fromCallable { Files.createTempFile("sample", ".out") }
          .doOnNext { log.info("Writing response data to file {}", it) }
          .flatMap { writeBuffersToPath(oldData, it).thenReturn(it) }
          .doOnNext { log.info("Response data saved to file {}", it) }
          .flatMapMany { DataBufferUtils.readInputStream(it::inputStream, bufferFactory, 4096) }

  fun writeBuffersToPath(buffer: Flux<DataBuffer>, path: Path) =
      Mono.using(
          { path.outputStream() },
          { DataBufferUtils.write(buffer, it).then() },
          { it.close() },
      )
}

with this:

@Component
class Customizer : WebClientCustomizer {

  override fun customize(webClientBuilder: WebClient.Builder) {
    webClientBuilder.filter(
        ExchangeFilterFunction.ofResponseProcessor {
            Mono.just(it.mutate()
                .body { oldData ->
                    val path = Files.createTempFile("sample", ".out")
                    DataBufferUtils.write(oldData, AsynchronousFileChannel.open(path, StandardOpenOption.WRITE))
                }
                .build())
        },
    )
  }
}


Comment From: markitovtr1

Hi, @rstoyanchev .

Thanks for taking a look in this issue.

Just for the record in case someone gets to this issue: my use case was different than this one. I'm actually uploading DataBuffer to S3. Migrating to functional endpoint solved my problem.

If I get the chance to test this again in my codebase, I'll try using DataBufferUtils methods to see if it solves. Since I moved my controllers to use functional routes, I don't think I'll be able to test this soon.

Comment From: rstoyanchev

Thanks for the additional detail. I would say more generally, if you need to intercept buffers in a Flux and write them somewhere, it's easier if you can avoid going through an InputStream and OutputStream, and make sure reference counts are incremented accordingly if whatever component does the writing is going to release them.