@Component public class ResponseBodyLogGlobalFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ResponseBodyLogGlobalFilter.class);

@Value("${gateway.logging.response-max-payload-length:200}")
private int responseMaxPayLoadLength;

public int getResponseMaxPayLoadLength() {
    return responseMaxPayLoadLength;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    return chain.filter(exchange.mutate()
            .response(new ResponseBodyServerHttpResponse(exchange)).build());
}

@Override
public int getOrder() {
    // NettyWriteResponseFilter的顺序默认为-1,在NettyWriteResponseFilter之前即可
    return -10;
}

private class ResponseBodyServerHttpResponse extends ServerHttpResponseDecorator {

    private final ServerWebExchange exchange;

    public ResponseBodyServerHttpResponse(ServerWebExchange exchange) {
        super(exchange.getResponse());
        this.exchange = exchange;
    }

    @SuppressWarnings("unchecked")
    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

        MediaType contentType = exchange.getResponse().getHeaders().getContentType();
        if (!shouldResponseBody(contentType)) {
            return super.writeWith(body);
        }

        ServerHttpResponse originalResponse = exchange.getResponse();
        DataBufferFactory originalBufferFactory = originalResponse.bufferFactory();

        // 参见NettyWriteResponseFilter返回的都是flux
        if (body instanceof Flux) {
            Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
            //todo flux.buffer() Does flux.buffer() cause a memory leak when an exception occurs?
            return super.writeWith(flux.buffer().map(dataBuffers  -> {
                // 合并多个流集合,解决返回体分段传输

                DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                DataBuffer joinDataBuffer = dataBufferFactory.join(dataBuffers);
                // 1kb = 1024kb,防止响应内容太大导致内存过大和影响速度
                if (joinDataBuffer.readableByteCount() > 0 && joinDataBuffer.readableByteCount() < 1024) {
                    byte[] originContent = new byte[joinDataBuffer.readableByteCount()];
                    joinDataBuffer.read(originContent);
                    // 释放掉内存
                   // todo There could be a leak in the code here because I myself have a new DefaultDataBufferFactory instead of 
                    NettyDataBufferFactory, will it cause a memory leak?
                    DataBufferUtils.release(joinDataBuffer);

                    String responseBodyLog = getResponseBodyLog(originContent);
                    if (log.isDebugEnabled()) {
                        log.debug(exchange.getLogPrefix() + "responseBodyLog is:" + responseBodyLog);
                    }
                    if (StringUtils.isNotEmpty(responseBodyLog)) {
                        exchange.getAttributes().put(Constants.RESPONSE_BODY_LOG_ATTR, responseBodyLog);
                    }
                    return originalBufferFactory.wrap(originContent);
                } else {
                    return joinDataBuffer;
                }
            }));
        }else {
            return super.writeWith(body);
        }

    }

    @Override
    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
        return writeWith(Flux.from(body).flatMapSequential(p -> p));
    }

    private boolean shouldResponseBody(MediaType contentType) {

        if (contentType == null) {
            return false;
        }

        if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
            return true;
        }

        return false;
    }

}

private String getResponseBodyLog(byte[] buf) {
    if (buf != null && buf.length > 0) {
        int length = Math.min(buf.length,getResponseMaxPayLoadLength());
        try {
            String responseBodyToUse = new String(buf, 0, length, StandardCharsets.UTF_8.name());
            if (buf.length > getResponseMaxPayLoadLength()) {
                responseBodyToUse = responseBodyToUse + "......";
            }
            return responseBodyToUse;
        }
        catch (UnsupportedEncodingException ex) {
            return "[unknown]";
        }
    }
    return null;
}

}

Comment From: bclozel

Can you move this to a sample project with a failing test? Looking at these code snippets I am not sure how buffers could leak and how.

Comment From: carl-HelloWorld

Can you move this to a sample project with a failing test? Looking at these code snippets I am not sure how buffers could leak and how., sorry,It has been updated,The code for now is the code for production deployment。I suspect that the flux.buffer() exception occurred or that the memory was not freed because of the new DefaultDataBufferFactory() I created, see the todo comment for details

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:780) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425) io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)

ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143) io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:333) io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:123) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)

Comment From: bclozel

Thanks but this is not actionable on our side. It's likely a bug in your code. You can use methods like DataBufferUtils.join if you want to achieve something like this.

Comment From: carl-HelloWorld

package com.yl.platform.gateway.filter;

import com.yl.platform.gateway.constant.Constants; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.core.Ordered; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono;

import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.List;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR;

/* * 请求体日志组件 / @Component public class RequestBodyLogGlobalFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(RequestBodyLogGlobalFilter.class);

private final List<HttpMessageReader<?>> messageReaders;

@Value("${yl.platform.gateway.logging.request-body-log-max-byte-length:512}")
private int requestBodyLogMaxByteLength;

/**
 * 响应体中日志
 */
@Value("${yl.platform.gateway.logging.request-body-log-enabled:true}")
private boolean requestBodyLogEnabled;

/**
 * 请求体中最大的的可读字节数,如果大于配置则不打印响应体日志
 */
@Value("${yl.platform.gateway.logging.request-body-max-readable-byte-count:1024}")
private int requestBodyMaxReadableByteLength;

public RequestBodyLogGlobalFilter(final ServerCodecConfigurer codecConfigurer) {
    this.messageReaders = codecConfigurer.getReaders();
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    if (!shouldRequestBody(exchange)) {
        return chain.filter(exchange);
    }

    if (exchange.getAttribute(Constants.REQUEST_BODY_LOG_ATTR) != null) {
        return chain.filter(exchange);
    }

    ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
    if (cachedRequest == null) {
        return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest -> {
            if (exchange.getRequest() == serverHttpRequest) {
                return chain.filter(exchange);
            }
            return requestBodyStoreToExchange(exchange, chain, serverHttpRequest);
        }));
    }else {
        return requestBodyStoreToExchange(exchange, chain, cachedRequest);
    }
}

private Mono<Void> requestBodyStoreToExchange(ServerWebExchange exchange, GatewayFilterChain chain, ServerHttpRequest cachedRequest) {
    final ServerRequest serverRequest = ServerRequest
            .create(exchange.mutate().request(cachedRequest).build(), messageReaders);
    // serverRequest.bodyToMono(byte[].class) maybe nettry leak ? when throw new DataBufferLimitException
    return serverRequest.bodyToMono(byte[].class).doOnNext(requestBody -> {
        String requestBodyLog = getRequestBodyLog(requestBody);
        if (log.isDebugEnabled()) {
            log.debug(exchange.getLogPrefix() + "requestBodyLog is:" + requestBodyLog);
        }
        if (StringUtils.isNotEmpty(requestBodyLog)) {
            exchange.getAttributes().put(Constants.REQUEST_BODY_LOG_ATTR, requestBodyLog);
        }
    }).then(Mono.defer(() -> {
        return chain.filter(exchange.mutate().request(cachedRequest).build());
    }));
}

@Override
public int getOrder() {
    // AuthorizationGlobalFilter之前
    return Ordered.HIGHEST_PRECEDENCE;
}

private boolean shouldRequestBody(ServerWebExchange exchange) {
    final MediaType contentType = exchange.getRequest().getHeaders().getContentType();
    if (!requestBodyLogEnabled) {
        return false;
    }

    if (contentType == null) {
        return false;
    }

    if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
        return true;
    }

    long contentLength = exchange.getRequest().getHeaders().getContentLength();
    if (contentLength < requestBodyMaxReadableByteLength) {
        return true;
    }

    return false;
}

public int getRequestBodyLogMaxByteLength() {
    return requestBodyLogMaxByteLength;
}

private String getRequestBodyLog(byte[] buf) {
    if (buf != null && buf.length > 0) {
        int length = Math.min(buf.length,getRequestBodyLogMaxByteLength());
        try {
            String requestBodyToUse = new String(buf, 0, length, StandardCharsets.UTF_8.name());
            if (buf.length > getRequestBodyLogMaxByteLength()) {
                requestBodyToUse = requestBodyToUse + "......";
            }
            return requestBodyToUse;
        } catch (UnsupportedEncodingException ex) {
            return "[unknown]";
        } catch (Throwable ex) {
            log.error("ex", ex);
            return "[unknown]";
        }
    }
    return null;
}

}