LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:91)
    org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:75)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2402)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onSubscribe(MonoCollect.java:104)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
    reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    reactor.core.publisher.Mono.subscribe(Mono.java:4490)
    reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
    reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    reactor.core.publisher.Mono.subscribe(Mono.java:4490)
    reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
    reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
    reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
    reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:148)
    reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666)
    reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)
    reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:651)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
    io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
    io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:748

Comment From: poutsma

If you'd like us to spend some time investigating, please take the time to provide a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem.

Comment From: children0001

How to contact you

Comment From: sbrannen

How to contact you

Simply provide your feedback as comments in this issue.

And as stated above, please provide a complete minimal sample that reproduces the problem.

Comment From: children0001

微信图片_20240307134413

Comment From: children0001

@Component
public class ModifyResponseDataFilter implements GlobalFilter, Ordered {
    private static final Logger logger = LoggerFactory.getLogger(ModifyResponseDataFilter.class);
    private static final String RX_REQUEST_KEY = "rx_requestId";
    //200 byte的长度
    private static final int K200 = 248576;
    @Value("${gateway.filter.excludeStaticSource}")
    private String excludeStaticSource;
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        logger.debug("ModifyResponseDataFilter54");
        exchange = defaultRxRequestId(exchange);
        logger.debug("ModifyResponseDataFilter56");
        ServerHttpResponse response = exchange.getResponse();
        DataBufferFactory orginDataBufferFactory = response.bufferFactory();
        logger.debug("ModifyResponseDataFilter59");
        ServerHttpRequest request = exchange.getRequest();
        String accessUri = request.getURI().getPath();
        String requestContent = request.getHeaders().getFirst("Content-Type");
        String requestMethod = request.getMethod().toString();
        String upgrade = exchange.getRequest().getHeaders().getUpgrade();
        logger.debug("gateway upgrade:{} accessUri:{}:{}",upgrade,request.getURI().getScheme(),accessUri);
        if(filterStaticSource(accessUri)) {
            logger.debug("ModifyResponseDataFilter chain.filter65");
            return chain.filter(exchange).doFinally(signalType -> {
                MDC.remove(RX_REQUEST_KEY);
            });
        }
        if(StringUtils.equalsIgnoreCase(exchange.getRequest().getHeaders().getUpgrade(),"websocket")){
            //不支持websocket服务
            logger.info("发现websocket请求,不执行返回值判断修改{}",accessUri);
            return chain.filter(exchange).doFinally(signalType -> {
                MDC.remove(RX_REQUEST_KEY);
            });
        }
        MediaType mediaType = response.getHeaders().getContentType();
        if (StringUtils.isBlank(requestContent)){
            requestContent = MediaType.APPLICATION_JSON.toString();
        }
        logger.debug("response的contentType:{}",mediaType);
//        if (null == mediaType || (!mediaType.includes(MediaType.APPLICATION_JSON) && !mediaType.includes(MediaType.APPLICATION_JSON_UTF8))) {
//            return chain.filter(exchange);
//        }
//        if(!((
//                StringUtils.equalsIgnoreCase(requestContent,MediaType.APPLICATION_JSON.toString())
//                        ||StringUtils.equalsIgnoreCase(requestContent,MediaType.APPLICATION_JSON_UTF8.toString()))
//                &&StringUtils.equalsIgnoreCase(requestMethod,"post"))){
//
//            return chain.filter(exchange);
//        }
        ServerWebExchange finalExchange = exchange;
        ServerHttpResponseDecorator serverHttpRequestDecorator = new ServerHttpResponseDecorator(response){
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
               // if(body instanceof  MonoJust ){

                    Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                Mono<Void> resall= super.writeWith(fluxBody.buffer().map(dataBuffers ->{
                        logger.debug("ModifyResponseDataFilter102");
                        DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                        //此databuffer需进行回收,放至finaly中处理
                        DataBuffer join = dataBufferFactory.join(dataBuffers);
                        logger.debug("ModifyResponseDataFilter106.length{}",join.readableByteCount());
                        byte[] contentByte = new byte[join.readableByteCount()];
                        try {
                            join.read(contentByte);
                            if(join!=null){
                                DataBufferUtils.release(join);
                            }
                            //如果响应过大,会进行截断,出现乱码,然后看api DefaultDataBufferFactory有个join方法可以合并所有的流,乱码的问题解决
                            logger.debug("ModifyResponseDataFilter114.time{}",System.currentTimeMillis());
                            HttpHeaders headers = finalExchange.getResponse().getHeaders();
                            logger.debug("ModifyResponseDataFilter116.time{}",System.currentTimeMillis());
                            MediaType contentType = headers.getContentType();
                            if (contentType != null && StringUtils.containsIgnoreCase(contentType.toString(),"application/json")) {
                                //  data.read(contentByte);
                                //   DataBufferUtils.release(data);
                                String result = "";
                                String content = new String(contentByte,StandardCharsets.UTF_8);
                                JSONObject jsonObject = JSONObject.parseObject(content);
                                String msg = jsonObject.getString("retMsg");
                                int retCode = jsonObject.getIntValue("retCode");
                                if(retCode == 0 && searchLowerCharSum(msg) > 10){
                                    logger.error("服务响应异常{}",content);
                                    jsonObject.put("retMsg","系统繁忙,请稍后再试");
                                }
                                if(contentByte.length > K200){
                                    logger.warn("accessUri:{},响应内容超过200K:{},长度为:{}",accessUri,content,contentByte.length);
                                }

//                           try{
//                               String encryText = RSAUtils.encryptByPrivateKey(content, RSAConstant.PRIVATE_KEY);
//                               jsonObject.put("encryText",encryText);
//                               jsonObject.put("encry",true);
//                           }catch (Exception se){
//                              logger.error("返回值加密错误",se);
//                           }
                                logger.debug("ModifyResponseDataFilter139");
                                result = jsonObject.toJSONString();
                                DataBuffer res1 =orginDataBufferFactory.wrap(result.getBytes(StandardCharsets.UTF_8));
                                logger.debug("ModifyResponseDataFilter142");
                                return  res1;
                            }
                            DataBuffer res2 =  orginDataBufferFactory.wrap(contentByte);
                            logger.debug("ModifyResponseDataFilter146");
                            return  res2;
                        }catch (Exception e){
                            logger.debug("ModifyResponseDataFilter149");
                            DataBuffer res3 = orginDataBufferFactory.wrap(contentByte);
                            return  res3;
                        }finally {
                            for(DataBuffer dataBuffer:dataBuffers){
                                if(dataBuffer!=null){
                                    DataBufferUtils.release(dataBuffer);
                                }
                            }
                            if(join!=null){
                                DataBufferUtils.release(join);
                            }                        }

                    }));
                logger.debug("ModifyResponseDataFilter163");
                return resall;
            }
        };
        logger.debug("ModifyResponseDataFilter chain.filter154");
          Mono<Void> res =chain.filter(exchange.mutate().response(serverHttpRequestDecorator).build()).doFinally(signalType -> {
            MDC.remove(RX_REQUEST_KEY);
        });
        logger.debug("ModifyResponseDataFilter chain.filter158");
        return res;
        //return chain.filter(exchange.mutate().response(responseDecorator(exchange)).build());
    }


    private ServerHttpResponseDecorator responseDecorator(ServerWebExchange exchange){
        return new ServerHttpResponseDecorator(exchange.getResponse()) {
            ServerHttpResponse serverHttpResponse = exchange.getResponse();
            DataBufferFactory bufferFactory = serverHttpResponse.bufferFactory();

            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                return super.writeWith(DataBufferUtils.join(Flux.from(body))
                        .map(dataBuffer -> {

                            byte[] content = new byte[dataBuffer.readableByteCount()];
                            dataBuffer.read(content);
                            DataBufferUtils.release(dataBuffer);
                            return content;

                        }).flatMap(bytes -> {
                            byte[] contentByte = null;
                            MediaType mediaType = serverHttpResponse.getHeaders().getContentType();

                            if (null == mediaType || (!mediaType.includes(MediaType.APPLICATION_JSON) && !mediaType.includes(MediaType.APPLICATION_JSON_UTF8))) {
                                contentByte = bytes;
                            } else {
                                String bodyString = "";
                                int length = bytes.length;

                                if (!ObjectUtils.isEmpty(exchange.getResponse().getHeaders().get(HttpHeaders.CONTENT_ENCODING))
                                        && exchange.getResponse().getHeaders().get(HttpHeaders.CONTENT_ENCODING).contains("gzip")) {
                                    GZIPInputStream gzipInputStream = null;
                                    try {
                                        gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(bytes), length);
                                        StringWriter writer = new StringWriter();
                                        IOUtils.copy(gzipInputStream, writer, "UTF-8");
                                        bodyString = writer.toString();

                                    } catch (IOException e) {
                                        logger.error("====Gzip IO error", e);
                                    } finally {
                                        if (gzipInputStream != null) {
                                            try {
                                                gzipInputStream.close();
                                            } catch (IOException e) {
                                                logger.error("===Gzip IO close error", e);
                                            }
                                        }
                                    }
                                } else {
                                    bodyString = new String(bytes, StandardCharsets.UTF_8);
                                }
                                if(GateWayUtil.isJSON2(bodyString)){
                                    JSONObject jsonObject = JSONObject.parseObject(bodyString);
                                    if(jsonObject.getIntValue("status")!=0 && jsonObject.getIntValue("status")!= HttpStatus.OK.value()){
                                        jsonObject.put("message","系统繁忙,请稍后再试");
                                    }
                                    String msg = jsonObject.getString("retMsg");
                                    int retCode = jsonObject.getIntValue("retCode");
                                    if(retCode == 0 && StringUtils.isNotBlank(msg) &&searchLowerCharSum(msg) > 10){
                                        jsonObject.put("retMsg","系统繁忙,请稍后再试");
                                    }
                                   // jsonObject.put("go","go");
                                    bodyString = jsonObject.toJSONString();
                                    logger.info("bodyString: {}", bodyString);
                                    contentByte = bodyString.getBytes(StandardCharsets.UTF_8);
                                }

                            }


                            return Mono.just(bufferFactory.wrap(contentByte));
                        }));
            }
            @Override
            public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
                return writeWith(Flux.from(body).flatMapSequential(p -> p));
            }
        };
        }


    private static int searchLowerCharSum(String str){
        int sum = 0;
        for (int i = 0, j = str.length(); i < j; i++) {
            if (str.charAt(i) >= 'a' && str.charAt(i) <= 'z'){
                ++sum;
            }
        }
        return sum;
    }


    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }


    private boolean filterStaticSource(String accessUrl) {
        int suffixPointIndex = accessUrl.lastIndexOf(".");

        if(suffixPointIndex<0) {
            return false;
        }

        String suffix = accessUrl.substring(suffixPointIndex+1,accessUrl.length());

        if(excludeStaticSource.contains(suffix)) {
            return true;
        }

        return false;
    }


    private ServerWebExchange defaultRxRequestId(ServerWebExchange exchange) {
        String uuid = exchange.getRequest().getHeaders().getFirst(RX_REQUEST_KEY);
        if (StringUtils.isBlank(uuid)) {
            uuid = UUID.randomUUID().toString();
        }
        MDC.put(RX_REQUEST_KEY, uuid);
        ServerHttpRequest newRequest = exchange
                .getRequest()
                .mutate()
                .header(RX_REQUEST_KEY, uuid).build();
        return exchange.mutate().request(newRequest).build();
    }
}

Comment From: children0001

orginDataBufferFactory.wrap(contentByte) will leak when contentByte size >10M

Comment From: children0001

NettyDataBufferFactory.wrap

Comment From: snicoll

I am afraid that's not the minimal sample we've requested. Please do not paste code and screenshot as they're not very useful. We need a sample we can run in the form of a minimal sample. You can attach a zip to this issue or push the code to a GitHub repository.

Comment From: rstoyanchev

The sample code above shows a lot of direct manipulation of response data buffers, and there is a good chance the leak is related to that, which would not be something we can fix. For example, the fluxBody.buffer() is buffering response buffers but there is no doOnDiscard in the chain to deal with releasing those buffers in case of an error or cancellation signals.

I haven't reviewed extensively, and it is not my goal to find all such issues. I suggest that you extract this code out and encapsulate into something re-usable that can be tested independent of the server, and then write some tests with NettyDataBufferFactory in various error scenarios, to make sure the code doesn't create any leaks.

Comment From: spring-projects-issues

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.