This is based on investigation under https://github.com/reactor/reactor-netty/issues/1374.

In DataBufferUtils#join we use a custom List to collect buffers and check the total buffered size. When a buffer is added that hits the limit, an error is raised but this causes a double release of the current buffer because Reactor's MonoCollect discards both the item that was passed to the add method and all items in the list. That leads to a double decrease of the refCount when it should be decremented only once.

Comment From: zhou-hao

Will 5.2.13 fix this problem?

Comment From: bclozel

@zhou-hao it was fixed in 5.2.12, see the linked issue #26061

Comment From: zhou-hao

when i download large file:

return webClient
                .get()
                .uri(location)
                .retrieve()
                .bodyToFlux(DataBuffer.class)
                .as(dataStream -> {
                    Path filePath = file.toPath();
                    log.debug("download protocol file {} to {}", location, file.getAbsolutePath());
                    try {
                        @SuppressWarnings("all")
                        AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, CREATE, WRITE);
                        return DataBufferUtils
                            .write(dataStream, asynchronousFileChannel)
                            .doOnNext(DataBufferUtils.releaseConsumer())
                            .doAfterTerminate(() -> {
                                try {
                                    asynchronousFileChannel.close();
                                } catch (IOException ignored) {
                                }
                            })
                            .then(Mono.just(file.getAbsolutePath()));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                })

throws:

2021-01-19 20:14:52.315 ERROR 38415 --- [r-http-kqueue-5] reactor.core.publisher.Operators         : Operator called default onErrorDropped

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_           ⇢ at reactor.netty.channel.ChannelOperations.receiveObject(ChannelOperations.java:235)
    |_ Flux.from ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:463)
    |_  Flux.map ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:464)
    |_ Flux.from ⇢ at reactor.netty.ByteBufFlux.fromInbound(ByteBufFlux.java:72)
Stack trace:
        at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
        at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
        at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
        at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
        at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:264)
        at reactor.netty.channel.FluxReceive.lambda$request$1(FluxReceive.java:135)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:293)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Comment From: bclozel

@zhou-hao Could you create a new issue with a sample project reproducing the problem?

Comment From: zhou-hao

ok,try my best。

发自我的iPhone

------------------ Original ------------------ From: Brian Clozel <notifications@github.com> Date: Tue,Jan 19,2021 8:39 PM To: spring-projects/spring-framework <spring-framework@noreply.github.com> Cc: 老周 <zh.sqy@qq.com>, Mention <mention@noreply.github.com> Subject: Re: [spring-projects/spring-framework] DataBufferUtils#join may release a DataBuffer more than necessary (#26060)

@zhou-hao Could you create a new issue with a sample project reproducing the problem?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe.

Comment From: zhou-hao

sorry,it's my bad~~

 return DataBufferUtils
     .write(dataStream, file.toPath(), CREATE, WRITE)