Hi all, Using spring 6.0.11, I encountered an issue while trying to use DataBufferUtils with Flux (not Mono). I wanted to be able to gracefully handle errors, and use Flux so I could handle the download of large files without using extra memory.
To get around this, i used my own WritableByteChannelSubscriber, which checks the argument to the hookOnObject method. If it's a RuntimeException, I rethrow it. If it's a DataBuffer, I continue.
Should the Spring-supplied version behave the same way or is there a more elegant way to configure the WebClient to handle it?
I asked a question about this here https://stackoverflow.com/questions/77037515/cant-get-spring-webclient-to-work-flux-databuffers-error-handling-and-header
Comment From: poutsma
Next time, please take the time to provide a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem. A StackOverflow question does not suffice.
I did look at the sample code in your question, and there are a number of oddities in there:
* The response
variable is a Flux<Object>
that can contain DataBuffer
as well as WebClientResponseException
instances, which is odd. Typically, you propagate exceptions as error signals, created using Mono.error
or Flux.error
, so that you end up with a Flux<DataBuffer>
, which—like every reactive stream—can also contain error signals.
* The sample uses exchangeToFlux
in a way to does exactly what retrieve
does, so why not use retrieve
?
* DataBuffer.write
requires a Flux<DataBuffer>
as parameter, so it only operates on DataBuffer
instances and does not accept WebClientResponseException
s. Casting the Flux<?>
to a Flux<DataBuffer>
is not going to change that, so it's not strange that you're getting ClassCastExceptions
.
* DataBufferUtils.write
is a reactive method, because it returns a Mono<Void>
. That result value is not used anywhere, so the mono is not subscribed to, and nothing will be written.
* The Content-Disposition
header is typically not set by servers, but by clients when doing multipart form uploads. In the sample, the header will typically be empty. For a GET request, you can use the filename from the URL instead.
* The sample perform blocking file operations (i.e Files.createTempFile
and File.rename
) in a reactive stream, when you typically want to schedule those on a different scheduler.
* I have no idea why the sample creates two files, then renames the one to the other.
Instead, to store a GET request as file, do something like the following:
Flux<DataBuffer> buffers = webClient.get()
.uri(uri)
.retrieve()
.bodyToFlux(DataBuffer.class);
Mono<Void> result = Mono.defer((() -> {
try {
Path tempFile = Files.createTempFile("file-", "bin");
return DataBufferUtils.write(buffers, tempFile);
}
catch (IOException e) {
return Mono.error(e);
}
}))
.subscribeOn(Schedulers.boundedElastic());
When result
is subscribed to, the result will be written to a temp file.