Affects: 5.1.4.RELEASE
I see buffer leak reports from Netty's ResourceLeakDetector when a Mono.timeout cancels a subscription to the Mono returned from bodyToMono. I've created a demo project with a test case which seems to reliably reproduce the issue. A separate test case is included which uses the reactor-netty HttpClient directly instead of using it through the WebClient wrapper - and I have not seen any leak reports in this case with the current version of reactor-netty (v0.8.4.RELEASE).
Demo project with mentioned test cases can be found here: https://github.com/danielra/buffer-leak-repro-reactor-netty The specific test case which reproduces the leak can be seen here: https://github.com/danielra/buffer-leak-repro-reactor-netty/blob/master/src/test/java/com/example/demo/DemoApplicationTests.java#L59
The tests can be run via ./gradlew clean test --debug. Please note that the tests "pass" unconditionally, but buffer leak reports can be viewed in the console output.
Here is an example of the leak report log messages I have observed from this test case: webclient_buffer_leak_example_log.txt
Comment From: rstoyanchev
Thanks for the report. This has been tagged as Reactor Netty issue. Please follow https://github.com/reactor/reactor-netty/issues/603.
Comment From: danielra
Thanks. I will follow that issue as you suggest. Interesting that the issue here is actually with reactor-netty, when the problem doesn't seem to reproduce when using their HttpClient directly - only when using reactor-netty indirectly via WebClient. Thanks for looking into it and forwarding appropriately!
Comment From: rstoyanchev
Interesting that the issue here is actually with reactor-netty, when the problem doesn't seem to reproduce when using their HttpClient directly - only when using reactor-netty indirectly via WebClient.
Are you sure about this? I see the exact opposite. When I run the tests with @Ignore on reproBufferLeakOnTimeout I see the leaks, or vice versa if i put @Ignore on noReproBufferLeakOnTimeout the leak reports go away.
This is also supported by looking at the source code. The Reactor Netty ByteBufFlux#aggregate is accumulating and retains without provisions for cancellation. By contrast the StringDecoder does use onDiscard.
So as far as I can see we're okay in the Spring Framework. /cc @violetagg
Comment From: violetagg
@rstoyanchev I'm not able to reproduce the issue.
Only these exceptions appear in the log files
- https://github.com/reactor/reactor-netty/issues/636
2.
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1441)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1373)
at io.netty.buffer.PooledHeapByteBuf.nioBuffer(PooledHeapByteBuf.java:298)
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1224)
at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:266)
at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:207)
at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)
at org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:384)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.CombinedChannelDuplexHandler.chan
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at java.base/java.lang.Thread.run(Thread.java:834)
I'm using Netty 4.1.34.Final and Reactor Netty 0.8.6.BUILD-SNAPSHOT
Comment From: danielra
I just took another look at this. I've upgraded the spring-boot version from 2.1.2.RELEASE to 2.1.3.RELEASE, and thus spring-framework 5.1.5.RELEASE and reactor-netty 0.8.5.RELEASE.
It looks like I may have just gotten "lucky" in my initial runs of my test cases and only happened to see leak reports when running the one that uses the WebClient. I do still see leak messages emitted when running both test cases (but only sometimes).
I ran the following 20 times (running just the test case which uses a WebClient which uses reactor-netty):
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient
and here is the output from the 6 of those times where leak reports were emitted: webclient-test-log-with-leak_1.txt webclient-test-log-with-leak_2.txt webclient-test-log-with-leak_3.txt webclient-test-log-with-leak_4.txt webclient-test-log-with-leak_5.txt webclient-test-log-with-leak_6.txt
I ran the following 20 times (running just the test case which uses reactor-netty directly):
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty
and here is the output from the 4 of those times where leak reports were emitted: reactor-netty-test-log-with-leak_1.txt reactor-netty-test-log-with-leak_2.txt reactor-netty-test-log-with-leak_3.txt reactor-netty-test-log-with-leak_4.txt
There is some variety to the leak reports. One general observation is that in the direct reactor-netty cases, the leak reports were always accompanied by IllegalReferenceCountExceptions like the one noted above. In contrast, none of the WebClient test runs which emitted leak reports contained this exception (though I did see this exception on a few runs which did not emit leak reports). I don't know whether this detail is a coincident or not.
I've pushed a few minor changes to the test cases along with the version update noted above.
Comment From: rstoyanchev
I'm using Netty 4.1.34.Final and Reactor Netty 0.8.6.BUILD-SNAPSHOT
@danielra did you notice the versions mentioned? Can you also try with:
ext['reactor-bom.version'] = 'Californium-BUILD-SNAPSHOT'
Comment From: danielra
Thank you for calling that out! I did miss updating that version. I have done so now (and pushed the change), and on my first attempt with the webClient test, I observed this leak report emitted via:
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient
webclient-test-log-with-leak-updated-reactor_1.txt It took a few runs, but I also observed a leak report in the test case that uses the reactor-netty client directly via:
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty
reactor-netty-test-log-with-leak-updated-reactor_1.txt
FWIW, the pattern of correlation with IllegalReferenceCountExceptions occurring when leak reports are emitted by the direct reactorNetty test (but not in the webClient test) persisted.
Comment From: danielra
Sorry, that was with the updated reactor-bom but Netty 4.1.33.Final instead of Netty 4.1.34.Final. I've updated the netty version as well and captured the following two leak reports:
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient
webclient-test-log-with-leak-updated-reactor-and-netty_1.txt
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty
reactor-netty-test-log-with-leak-updated-reactor-and-netty_1.txt
Comment From: violetagg
@danielra Can you try now 0.8.6.BUILD-SNAPSHOT I committed the fix for reactor/reactor-netty#636
Comment From: violetagg
Created an issue for the exception that I observed when running the test with WebClient #22594
Comment From: danielra
Thanks!
I just tried a few runs of the test cases with the 0.8.6.BUILD-SNAPSHOT version of reactor-netty, and pushed that dependency change to the demo project. Here is example output from each test case from runs which emitted leak reports:
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient
webclient-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_1.txt
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty
reactor-netty-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_1.txt
Comment From: violetagg
@danielra According to the log you are still using 0.8.6.BUILD-SNAPSHOT without the fix above
08:57:04.793 [DEBUG] [TestEventLogger] io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:604)
08:57:04.793 [DEBUG] [TestEventLogger] io.netty.buffer.CompositeByteBuf$Component.transferTo(CompositeByteBuf.java:1794)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:464)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:266)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:222)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:165)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.WrappedCompositeByteBuf.addComponent(WrappedCompositeByteBuf.java:493)
08:57:04.794 [DEBUG] [TestEventLogger] io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.addComponent(AdvancedLeakAwareCompositeByteBuf.java:885)
08:57:04.794 [DEBUG] [TestEventLogger] reactor.netty.ByteBufFlux.lambda$aggregate$7(ByteBufFlux.java:259)
https://github.com/reactor/reactor-netty/commit/32be1c777c9936b8623c05c7d577b83f956f8589
Comment From: violetagg
@rstoyanchev I see the stack below, did you have something like this before?
2019-03-15 11:08:33.670 ERROR 35536 --- [r-http-kqueue-4] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:94)
io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:208)
reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:321)
reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:538)
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:158)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:362)
io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:181)
io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:259)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
java.lang.Thread.run(Thread.java:748)
#2:
org.springframework.http.client.reactive.ReactorClientHttpResponse.lambda$getBody$2(ReactorClientHttpResponse.java:77)
reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:205)
reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:321)
reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:538)
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:158)
Comment From: danielra
Oh, sorry. I thought specifying the version as 0.8.6.BUILD-SNAPSHOT would be enough to pickup your recent change.
I've now instead cloned the reactor-netty project and included the version built from the head of master (which does include the linked change) via an includeBuild entry in my demo project's settings.gradle file. Running the test case in this way, I still saw similar looking output after a few tries:
./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty
reactor-netty-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_2.txt
Comment From: rstoyanchev
@violetagg with 0.8.6 snapshots I see this from webClient tests:
#1:
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
...
#2:
Hint: Caller of readInbound() will handle the message from this point.
...
#3:
Hint: 'DefaultChannelPipeline$TailContext#0' will handle the message from this point.
...
#4
io.netty.buffer.AdvancedLeakAwareByteBuf.ensureWritable(AdvancedLeakAwareByteBuf.java:136)
And this with reactorNetty test:
#1
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
...
as well as this:
#1:
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
...
#2:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
#3:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
#4:
io.netty.buffer.AdvancedLeakAwareByteBuf.getByte(AdvancedLeakAwareByteBuf.java:154)
...
#5
io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106)
...
#6:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
#7:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
#8:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
#9:
io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedByte(AdvancedLeakAwareByteBuf.java:160)
...
#10:
Hint: 'reactor.left.httpCodec' will handle the message from this point.
...
#11:
Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
...
As for something like:
#2:
org.springframework.http.client.reactive.ReactorClientHttpResponse.lambda$getBody$2(ReactorClientHttpResponse.java:77)
The only way I can explain in the given scenario, i.e. decodeToMono(String.class), is some issue here which would imply something potentially wrong with the doOnDiscard hook. I could try to simulate and prove by feeding DatabufferUtils#join or StringDecoder with pooled buffers via Flux#interval. Are you seeing this consistently? I have not yet. And do you no longer see any other leaks?
Comment From: rstoyanchev
The only way I can explain in the given scenario, i.e. decodeToMono(String.class), is some issue here which would imply something potentially wrong with the doOnDiscard hook.
I have tried and failed to reproduce the issue with DataBufferUtils only using a test like this:
@Test
public void joinWithCancellation() {
Mono<DataBuffer> bufferFlux = DataBufferUtils.join(
Flux.interval(Duration.ofMillis(40)).take(50).map(aLong -> stringBuffer("foo" + aLong)));
StepVerifier.create(bufferFlux)
.thenAwait(Duration.ofMillis(ThreadLocalRandom.current().nextInt(81, 116)))
.thenCancel()
.verify();
}
This is in DataBufferUtilsTests so at the end of the test there is a verifyAllocations check.
@danielra since you have a reproducible test case, I suggest creating a new issue under https://github.com/reactor/reactor-netty.
Comment From: violetagg
Here is the issue for Reactor Netty https://github.com/reactor/reactor-netty/issues/700
Comment From: jkjome
I am using the latest version of Spring Boot (2.2.6) and even though this issue is supposed to have been fixed, I'm seeing the same memory leak being reported: "LEAK: ByteBuf.release() was not called before it's garbage-collected". Is there some place in the code below where I am supposed to be, somehow, calling ByteBuf.release(), even though it seems like Spring and Netty internal classes should be taking care of this? Or have I, otherwise, written the code improperly and should be doing it some other, better, way?
Edit: I should mention, I found the following two "Hint" messages in the stacktrace....
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
Hint: 'reactor.left.httpCodec' will handle the message from this point.
public StreamingResponseBody getSomeClientCertProtectedPdf(final HttpServletResponse response) {
final String urlStr = "https://somedomain.com/somepathto/somepdf.pdf";
final HttpClient httpClient = HttpClient.create().secure(spec -> {
final KeyManagerFactory keyManagerFactory = ....;
final TrustManagerFactory trustManagerFactory = ....;
spec.sslContext(SslContextBuilder.forClient()
.keyManager(keyManagerFactory)
.trustManager(trustManagerFactory)
.build());
});
final WebClient client = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl(urlStr).build();
final Flux<DataBuffer> dataBufferFlux = client.get()
.accept(MediaType.APPLICATION_PDF)
.retrieve()
.bodyToFlux(DataBuffer.class);
return out -> DataBufferUtils.write(dataBufferFlux, response.getOutputStream()).blockLast(Duration.ofSeconds(20));
}
Comment From: rstoyanchev
@jkjom it may sound similar but in your scenario you're getting raw data from the WebClient and that means you now have the responsibility to release buffers. You then use DataBufferUtils#write and its Javadoc says:
Does not close the output stream when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returned Flux with a releaseConsumer().
You can follow that recommendation or use bodyToFlux(byte[].class) or bodyToFlux(ByteBuffer.class) which will release the underlying ByteBuf's and give you a copy.
Comment From: jkjome
@rstoyanchev Thanks for your help! I finally found something that worked to get rid of ByteBuf.release() memory leak messages....
return out -> DataBufferUtils.write(dataBufferFlux, out).doOnNext(DataBufferUtils.releaseConsumer()).blockLast(Duration.ofSeconds(20));