Hello!
I have a simple application that acts as a proxy between a client and a server It receives a client request and passes it over to the server, then passes the server response back to the client There is no need to process the request/response body, or buffer them in memory Here is my current code:
webClient
.method(request.method)
.uri(request.path)
.headers { it.addAll(request.headers().asHttpHeaders()) }
.body(request.bodyToFlux<DataBuffer>())
.exchangeToMono { response ->
ServerResponse
.status(response.statusCode())
.headers { it.addAll(response.headers().asHttpHeaders()) }
.body(response.bodyToFlux<DataBuffer>())
}
After upgrading to WebFlux 5.3.0 I had to replace the .exchange()
method with .exchangeToMono()
. This new method in turn calls .releaseIfNotConsumed()
, and apparently passing the body as a Flux<DataBuffer>
is not consuming it, so my client always gets a response with an empty body.
What is the proper way to address my use case?
P.S. I created an issue here due to this request
Comment From: bclozel
Closing in favor of (if I'm not mistaken) your question on StackOverflow.
Comment From: bclozel
Sorry, I didn't see Rossen's comment. Reopening.
Comment From: rstoyanchev
Yes the new exchangeToMono
and exchangeToFlux
methods expect the body to be decoded inside the callback. However in this case it is only wrapped as ServerResponse
and decoded later when that response is written.
The above should be straight forward with .retrieve()
but there is currently no option for ResponseEntity<Flux<T>>
. There is only an option for ResponseEntity<List<T>>
but it should be the other way around. You can get always get a List<T>
from a Flux
while having only List<T>
imposes buffering.
Now that exchange()
is deprecated in 5.3 with #25751, having an option for retrieve().entityToFlux(..)
looks essential in order to allow decoding the body in any way while also having access to the response status and headers.
Comment From: mplain
@rstoyanchev Thank you for the quick response! 👍
If I may, a question regarding the implementation of .toEntityFlux()
There's a class WebClientUtils, it contains the underlying logic for .toEntity()
and .toEntityList()
, shared by both WebClient and ClientResponse
You did not add the logic of .toEntityFlux()
to that class. Is that intentional? In order to reduce the incentive to use .exchange()
?
Perhaps .exchangeToFlux(ClientResponse::toEntityFlux)
could be useful...
(I noticed because I wanted to submit the corresponding Kotlin extension functions)
Comment From: rstoyanchev
@mplain I see no need for an additional method in WebClientUtils
since the toEntityFlux
methods are currently only exposed from one place and the logic is trivial.
Comment From: robotmrv
@rstoyanchev
Is not toEntityFlux()
method unsafe the same way as exchange()
? (but in addition, to transfer real response you need to override default onStatus(Predicate, Function)
)
Why not just to un-deprecate exchange()
as it fits current case?
Comment From: mplain
@robotmrv My guess is that when users would use toEntityFlux
, they'd intend to consume the flux
But currently when users use exchange
, they often dont think about the body at all
So it's more about helping people not make accidental mistakes, and not about tying their hands
Any method exposing the flux of data buffers would need to be the user's responsibility...
Comment From: rstoyanchev
Indeed as @mplain said it's about providing more structure and avoiding accidental mistakes. The exchange()
method is too wide open and in most cases it's used unnecessarily. For .retrieve()
you have to specify how to decode the body and at this point in most cases you likely not use toEntityFlux
. Even if you do, error responses are taken care of, and it's obvious that you need to consume the body. None of that is the case with exchange.
Comment From: mplain
@rstoyanchev, actually, looking at this code, where are the exceptions taken care of?
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(Class<T> elementType) {
return this.responseMono.map(response ->
ResponseEntity.status(response.rawStatusCode())
.headers(response.headers().asHttpHeaders())
.body(response.bodyToFlux(elementType)));
}
Comparing to another method:
public <T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementClass) {
return this.responseMono.flatMap(response ->
WebClientUtils.mapToEntityList(response,
handleBodyFlux(response, response.bodyToFlux(elementClass))));
}
Exceptions are handled in the handleBodyFlux
method, which is not used in toEntityFlux
...?
Comment From: rstoyanchev
You're right @mplain and I've created #26069.
Comment From: HaloFour
This deprecation of exchange()
is also affecting my projects where I'm using a custom BodyExtractor to read a very large JSON array as a Flux<T>
on which I transform the elements individually and return them as a Flux<T>
to the client, to avoid having to buffer the data in memory. I also need to pass along any etag
header from the upstream service back down to the client, so I end up translating the ClientResponse
into a POJO that contains a Flux<T>
of the body.
As far as I can tell I can't do this with exchangeToMono
or exchangeToFlux
. It continues to work with the deprecated exchange
but I assume it's deprecated because you intend to remove it in the future.
For more info, see: https://github.com/spring-projects/spring-framework/issues/24951
Comment From: rstoyanchev
As far as I can tell I can't do this with exchangeToMono or exchangeToFlux
Can you clarify why?
Comment From: HaloFour
@rstoyanchev
Can you clarify why?
A simple attempt at refactoring is yielding an empty response body. I haven't figured out why and it's quite likely that I'm doing it wrong.
I'm trying to take the following:
.exchange()
.flatMap(response -> {
if (statusCode.isError()) {
return response.createException().flatMap(Mono::error);
} else {
var headers = response.headers().asHttpHeaders();
var pointer = JsonPointer.compile("/response/entities");
var body = response.body(streamingJsonBodyExtractor.toFlux(Entity.class, pointer));
return Mono.just(new ResponseWrapper(headers, body));
}
});
and I've attempted to refactor it into the following:
.exchangeToMono(response -> {
if (statusCode.isError()) {
return response.createException().flatMap(Mono::error);
} else {
var headers = response.headers().asHttpHeaders();
var pointer = JsonPointer.compile("/response/entities");
var body = response.body(streamingJsonBodyExtractor.toFlux(Entity.class, pointer));
return Mono.just(new EntitiesResponse(headers, body));
}
});
If I instead use exchangeToFlux
and flow the Flux<Entity>
to the response it appears to work, but I need to capture and communicate etag and cache headers to the client as well, which is why I'm wrapping the headers and body in this Mono<EntitiesResponse>
.
Comment From: rstoyanchev
This is what the toEntityFlux
methods introduced for this issue are for, if you need to wrap the decoded response.
That said what actually consumes the body downstream? I ask is wrapping like that and letting it be consumed later opens a possibility that it may not be consumed at all, for example in case of a cancellation signal.
Comment From: HaloFour
@rstoyanchev
This is what the
toEntityFlux
methods introduced for this issue are for, if you need to wrap the decoded response.
As far as I can tell I can't use those methods as they expect that the response being decoded is in a very specific format. In my case the response is application/json
where a very large array of objects is embedded within a wrapper object. I would need a similar entityTo*
method that could accept an arbitrary BodyExtractor<T>
.
That said what actually consumes the body downstream?
The controller returns a Flux<T>
and streams the response to the client.
Comment From: HaloFour
This is probably fodder for another issue but could I propose some combination of the following method(s) on ResponseSpec
as well?
<T> Mono<ResponseEntity<T>> toEntity(BodyExtractor<T> extractor);
<T> Mono<ResponseEntity<T>> toEntity(Function<ClientResponse, Mono<T>> function);
<T> Mono<ResponseEntity<T>> toEntityFlux(Function<ClientResponse, Flux<T>> function);