Source code:

package com.yl.platform.gateway.filter;

import com.yl.platform.gateway.constant.Constants;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR;

/**
 * 请求体日志组件
 */
@Component
public class RequestBodyLogGlobalFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(RequestBodyLogGlobalFilter.class);

    private final List<HttpMessageReader<?>> messageReaders;

    @Value("${yl.platform.gateway.logging.request-body-log-max-byte-length:512}")
    private int requestBodyLogMaxByteLength;

    /**
     * 响应体中日志
     */
    @Value("${gateway.logging.request-body-log-enabled:true}")
    private boolean requestBodyLogEnabled;

    /**
     * 请求体中最大的的可读字节数,如果大于配置则不打印响应体日志
     */
    @Value("${gateway.logging.request-body-max-readable-byte-count:1024}")
    private int requestBodyMaxReadableByteLength;

    public RequestBodyLogGlobalFilter(final ServerCodecConfigurer codecConfigurer) {
        this.messageReaders = codecConfigurer.getReaders();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        if (!shouldRequestBody(exchange)) {
            return chain.filter(exchange);
        }

        if (exchange.getAttribute(Constants.REQUEST_BODY_LOG_ATTR) != null) {
            return chain.filter(exchange);
        }

        ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
        if (cachedRequest == null) {
            return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest -> {
                if (exchange.getRequest() == serverHttpRequest) {
                    return chain.filter(exchange);
                }
                return requestBodyStoreToExchange(exchange, chain, serverHttpRequest);
            }));
        }else {
            return requestBodyStoreToExchange(exchange, chain, cachedRequest);
        }
    }

    private Mono<Void> requestBodyStoreToExchange(ServerWebExchange exchange, GatewayFilterChain chain, ServerHttpRequest cachedRequest) {
        final ServerRequest serverRequest = ServerRequest
                .create(exchange.mutate().request(cachedRequest).build(), messageReaders);
        // todo serverRequest.bodyToMono(byte[].class) maybe cause a memory leak when throw DataBufferLimitException
        // -Dspring.codec.max-in-memory-size = 1
        return serverRequest.bodyToMono(byte[].class).doOnNext(requestBody -> {
            String requestBodyLog = getRequestBodyLog(requestBody);
            if (log.isDebugEnabled()) {
                log.debug(exchange.getLogPrefix() + "requestBodyLog is:" + requestBodyLog);
            }
            if (StringUtils.isNotEmpty(requestBodyLog)) {
                exchange.getAttributes().put(Constants.REQUEST_BODY_LOG_ATTR, requestBodyLog);
            }
        }).then(Mono.defer(() -> {
            return chain.filter(exchange.mutate().request(cachedRequest).build());
        }));
    }

    @Override
    public int getOrder() {
        // AuthorizationGlobalFilter之前
        return Ordered.HIGHEST_PRECEDENCE;
    }

    private boolean shouldRequestBody(ServerWebExchange exchange) {
        final MediaType contentType = exchange.getRequest().getHeaders().getContentType();
        if (!requestBodyLogEnabled) {
            return false;
        }

        if (contentType == null) {
            return false;
        }

        if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
            return true;
        }

        long contentLength = exchange.getRequest().getHeaders().getContentLength();
        if (contentLength < requestBodyMaxReadableByteLength) {
            return true;
        }

        return false;
    }

    public int getRequestBodyLogMaxByteLength() {
        return requestBodyLogMaxByteLength;
    }

    private String getRequestBodyLog(byte[] buf) {
        if (buf != null && buf.length > 0) {
            int length = Math.min(buf.length,getRequestBodyLogMaxByteLength());
            try {
                String requestBodyToUse = new String(buf, 0, length, StandardCharsets.UTF_8.name());
                if (buf.length > getRequestBodyLogMaxByteLength()) {
                    requestBodyToUse = requestBodyToUse + "......";
                }
                return requestBodyToUse;
            } catch (UnsupportedEncodingException ex) {
                return "[unknown]";
            } catch (Throwable ex) {
                log.error("ex", ex);
                return "[unknown]";
            }
        }
        return null;
    }

}

Comment From: carl-HelloWorld

// todo serverRequest.bodyToMono(byte[].class) maybe cause a memory leak when throw DataBufferLimitException // -Dspring.codec.max-in-memory-size = 1 -Dio.netty.leakDetection.level = ADVANCED -Dlogging.level.reactor.ipc.netty = DEBUG Spring  DataBufferUtils.join and Netty Leak   LEAK: ByteBuf.release()

Spring  DataBufferUtils.join and Netty Leak   LEAK: ByteBuf.release()

img_v3_02hm_c1336dca-2bff-4541-bb90-8ff2e5b0d5eg

2024-12-18 17:53:18.498 logback [reactor-http-nio-62024-12-18 17:53:18.498 logback [reactor-http-nio-6] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records:

1:

    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:285)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

2:

    io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106)
    io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:333)
    io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:123)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

3:

    io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
    io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:908)
    io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:593)
    io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:255)
    io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:123)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

4:

    io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
    io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:908)
    io.netty.handler.codec.http.HttpObjectDecoder$LineParser.parse(HttpObjectDecoder.java:965)
    io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:236)
    io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:123)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

5:

    Hint: 'reactor.left.httpCodec' will handle the message from this point.
    io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

6:

    Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
    io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)

7:

    io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:634)
    io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:750)
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:750)
7 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.

Comment From: bclozel

Duplicates #34080

Please don't create duplicate issues. Again, copying many lines of code without minimal sample application won't help here. You're suggesting that ServerRequest.bodyToMono leaks buffers when the data is too large. This seems quite easy to reproduce without involving a custom Spring Cloud Gateway filter. Please provide a minimal sample application can we can git clone or unzip that demonstrates the problem.

I wrote a test checking for this and couldn't reproduce the issue you're facing, see https://github.com/bclozel/spring-framework/tree/gh-34113. I'm closing this issue for now until there is a concrete case for us to address.

Comment From: carl-HelloWorld

Sorry, the first step requires setting the startup parameters:

-Dio.netty.leakDetection.level = debug

-Dspring.codec.max-in-memory-size = 1

Step 2: Send a request in an endless loop, the length of the request body is greater than 1, and then io.netty.util.ResourceLeakDetector#reportLeak will print the memory leak log、

@bclozel

Comment From: bclozel

I have shared a unit test that does just that and cannot reproduce the problem.

Comment From: carl-HelloWorld

I have shared a unit test that does just that and cannot reproduce the problem.

thanks you: Below is the source code address: https://github.com/carl-HelloWorld/spring-gateway-leak-demo.git

First of all, you need to make a break point in the following places: ResourceLeakDetector.reportUntracedLeak, The memory leak log will appear within a minute。 @bclozel @nertzy @spring-projects-issues

Comment From: carl-HelloWorld

hello,I now provide a minimal unit of test and it can be reproduced, how can I fix this now。