We have an endpoint like this:

@GetMapping(path = "/api/v1/stuff", produces = { APPLICATION_JSON_VALUE })
public Flux<Stuff> getStuff() {

In our use case this endpoint can return tens of thousands of values. In production we see that all responses are flushed often, most of the time after each value is written. These values can be quite small, so flushing after each results in many small packets, which is wasteful due to TCP/IP bookkeeping and a poor use of the connection bandwidth.

This is not a streaming use case. The way the code is written implies that flushing after each value is not intentional: ReactorServerHttpResponse.writeWithInternal is called, as opposed to writeAndFlushWithInternal.

I was not able to reproduce the behavior by simply returning the values from memory. I could only reproduce when streaming data from the database, in our case PostgreSQL. I'm attaching a simple demo project, which on my machine reproduces the flushing consistently. Start the application and run

curl http://localhost:8080/api/v1/stuff

demo-flush.zip

Spring Boot version: 3.3.1

Comment From: sdeleuze

Thanks for the repro, in your use case ReactorServerHttpResponse.writeWithInternal is indeed invoked as expected which invokes NettyOutbound#send with a flushing predicate set to false.

Could you please share more on how you identify that flushing happens after each element returned by a Flux, is it by debugging at Netty level, network level, etc?

Comment From: 0xabadea

By looking at the network traffic. Attaching two capture files:

  • flush-spring-3.3.1.pcapng.gz shows the behavior in Spring Boot 3.3.1 -- many small packets.
  • flush-spring-2.7.3.pcapng.gz shows the behavior in Spring Boot 2.7.3 -- packets ~32K in size. I'm adding this just to show what I consider to be the right behavior. The implementation in 2.7.3 was different: it collected the flux into a list and Jackson-encoded the whole list.

Comment From: sdeleuze

@violetagg @rstoyanchev Is it expected that Reactor Netty flushes on every Flux element for the following use case?

ReactorServerHttpResponse.writeWithInternal is indeed invoked as expected which invokes NettyOutbound#send with a flushing predicate set to false.

Comment From: violetagg

@0xabadea Please enable wiretap and provide the logs (see how to do it below). We do try to combine the messsages and to flush on batches but it depends from the Publisher. We work with prefetch value 128, which means we will try to request 128 items and to flush them with one batch. However if your Publisher produces only 1 by 1, we cannot do anything. But let's see what we will see from wiretap.

@Component
public class MyNettyWebServerCustomizer
        implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {
    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer -> httpServer.wiretap(true));
    }
}

Comment From: 0xabadea

Attaching the wiretap logs.

@violetagg, could you please advice? How do you suggest I should change the publisher to publish in a more effective manner than 1 by 1? Eventually, onNext is still going to be called for each individual value, so my understanding is that MonoSendManyshould buffer several values before flushing.

Comment From: 0xabadea

wiretap-logs.txt.gz

Comment From: rstoyanchev

This change was made in #28398. In your case with lots of small items, it makes sense to aggregate, but in other cases it is better to stream. After the change we don't aggregate by default, but you can still choose to aggregate by using flux.collectList() and returning Mono<List<Stuff>>.

Comment From: violetagg

@0xabadea In the provided logs, I can see, that sometimes we flush many writes, sometime only one, and it is exactly how the implementation in Reactor Netty is done (async flushes that flush whatever was collected in the outbound buffer) If this is not enough, then either apply the @rstoyanchev recommendation or consider to add to the Reactor Netty pipeline https://netty.io/4.1/api/io/netty/handler/flush/FlushConsolidationHandler.html We cannot add this handler by default because it requires configuration that only the owner of the solution can decide.

Comment From: 0xabadea

Thank you for the pointer to FlushConsolidationHandler. I gave it a try, but it made no difference. This is how I'm adding it to the pipeline:

@Component
public class MyNettyWebServerCustomizer implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer ->
                httpServer.doOnConnection(conn -> conn.addHandlerFirst(new FlushConsolidationHandler(500, true)))
        );
    }
}

I tried both true and false for the consolidateWhenNoReadInProgress parameter, but that didn't make a difference either.

Comment From: violetagg

@0xabadea I assume that you are using HTTP/1.1 (if that's not the case please tell me). Try the following: httpServer.doOnConnection(conn -> conn.channel().pipeline().addBefore(NettyPipeline.HttpCodec, "some-name", new FlushConsolidationHandler(500, true)))

Comment From: 0xabadea

@violetagg Indeed, at least the reproducer uses HTTP/1.1. I have followed your yesterday's suggestion, but unfortunately I got the same behavior of many small flushes.

Comment From: violetagg

@0xabadea Then I will need some example project to play with it.

Comment From: 0xabadea

An example project is attached to the original description. The only change I made to it is adding

@Component
public class MyNettyWebServerCustomizer implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer ->
                httpServer.doOnConnection(conn -> conn.channel().pipeline().addBefore(NettyPipeline.HttpCodec, "some-name", new FlushConsolidationHandler(500, true)))
        );
    }
}

Comment From: violetagg

@0xabadea Yeah you are right - in your case FlushConsolidationHandler uses the same strategy as Reactor Netty (scheduled flushes), but you can take a look at it and change it to some custom FlushConsolidationHandler.

Comment From: snicoll

I am going to close this now as this is out of scope of this project. Thanks @violetagg for following up.