Would it be possible to send a flux of objects in the request body using WebClient with "Content-Type: application/json" but avoid having to collect the whole flux as a list in memory?
Let's say I have a Flux containing millions of elements and my target server only accepts non-streaming content-types, the Jackson2JsonEncoder would then call "collectList()" on the flux, possibly running out of memory.
Couldn't the Jackson2JsonEncoder somehow write the objects as they come available?
Comment From: poutsma
It is possible, but not with application/json
as the Content-Type; it needs to be application/x-ndjson
instead.
There is significant overhead for writing JSON content individually instead of collectively. That is why the Jackson2JsonEncoder
collects to a list by default, and serializes that. If you specify a streaming mime-type, set to application/x-ndjson
by default–but changeable via the streamingMediaTypes
property, then the Jackson2JsonEncoder
will not collect the list but stream them as they arrive, with a newline in between the elements.
Does that answer your question?
Comment From: micopiira
Hey, I'm trying to POST to a server that only accepts application/json
. With something more low level than WebClient, like reactor-netty I could do something like this:
Flux<String> flux = Flux.range(0, 1000000).map(Object::toString);
final Mono<HttpClientResponse> response = httpClient.headers(headers -> {
headers.set(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());
headers.set(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON.toString());
}).post()
.uri("/url")
.send(ByteBufFlux.fromString(flux
.map(o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).zipWith(Flux.just(",").repeat(), (a, b) -> a + b).startWith("[").concatWithValues("]")))
.response();
Which would then, atleast for what I know stream the objects to the target as they come available without having to wait for all of them.
Would this kind of behaviour be possible to implement in WebClient or should I stick with more low level clients like reactor-netty?
Comment From: poutsma
I'm trying to POST to a server that only accepts application/json
You can change the streamingMediaTypes
property of Jackson2JsonEncoder
from the default to application/json
, and trigger the streaming behavior that way. The reference docs explains how to change codec defaults.
Comment From: micopiira
I see, however it does not produce valid JSON "out of the box". Heres what I tried:
private final WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080/")
.codecs(clientCodecConfigurer -> {
final Jackson2JsonEncoder jackson2JsonEncoder = new Jackson2JsonEncoder();
jackson2JsonEncoder.setStreamingMediaTypes(List.of(MediaType.APPLICATION_JSON));
clientCodecConfigurer.defaultCodecs().jackson2JsonEncoder(jackson2JsonEncoder);
})
.build();
Flux<MyEntity> flux = ...;
webClient.post()
.uri("/")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(flux, MyEntity.class)
.retrieve()
.bodyToMono(MyResponse.class)
This will not wrap the JSON in an array nor add commas between the items. Should I manually map my Flux<MyEntity>
into a Flux<String>
with the wrapping [
& ]
and commas in between the items?
Comment From: rstoyanchev
Looking at the Javadoc of setStreamingMediaTypes
it's mainly about flushing per item (for event streams that emit periodically) vs a single flush at the end (for continuous streams).
Maybe, if we are in the streaming section and the media type is "application/json" (i.e. explicitly set as a streaming media type), we could simply add the opening and closing square brackets to ensure valid JSON is produced.
We could also switch to flushing at some regularity > 1 (or just leave it to the underlying server buffer) since we know it's a media type that implies continues writing and shouldn't require explicit flushes. In which case I'm even wondering about removing the non-streaming section entirely, and doing this by default, so that we always write Flux
items as they come with flushing as the only difference between streaming and non-streaming media types.
Comment From: hu553in
@rstoyanchev is there a way to override this behavior to old-way now?
I need to temporarily configure Spring WebFlux Kotlin Flow
endpoints to have Content-Type: application/json
and to be collected to list before writing...
Because my frontend app is not able to process stream at the moment
Comment From: bclozel
@hu553in this issue is about the other way around: avoiding to buffer all elements before sending them as a single JSON document from the client. If you're using the server side of WebFlux and an application/json
content type, data should not be streamed to the client. Maybe create a new StackOverflow question explaining the problem and showing how you're using this?
Comment From: hu553in
@bclozel done, thank you... :) will be glad if you check it, maybe you have simple answer.. I don't think that this is very complex issue