Spring : 5.3.12 Java : 1.8
I'm trying to stream multipart data to another backend service API, which received through ServerRequest in webflux. To ensure data/file gets directly streamed to backend API without saving or holding into the memory. I was trying to enable multipart streaming as described WebFlux multipart streaming not work. But, I'm getting following exception :
LN="o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler" [c92f5530-3] 500 Server Error for HTTP POST "/"
java.lang.IllegalStateException: No HttpMessageReader for "multipart/form-data" and "org.springframework.http.codec.multipart.Part"
at org.springframework.web.reactive.function.BodyExtractors.lambda$findReader$19(BodyExtractors.java:247)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ com.example.demo.filter.LoggingWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP POST "/" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.web.reactive.function.BodyExtractors.lambda$findReader$19(BodyExtractors.java:247)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.springframework.web.reactive.function.BodyExtractors.findReader(BodyExtractors.java:247)
at org.springframework.web.reactive.function.BodyExtractors.lambda$toParts$8(BodyExtractors.java:161)
at org.springframework.web.reactive.function.server.DefaultServerRequest.bodyInternal(DefaultServerRequest.java:167)
at org.springframework.web.reactive.function.server.DefaultServerRequest.body(DefaultServerRequest.java:157)
at com.example.demo.MainController.lambda$2(MainController.java:78)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:173)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:684)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
I'm extracting the Parts and creating multipart body to upload to backend using webclient
response = serverRequest.body(BodyExtractors.toParts()).collectList().flatMap(mapper -> {
WebClient webClient = webClientBuilder.build();
return webClient.post()
.uri("/service/backend")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(getMultipartBody(mapper)))
.retrieve();
});
private MultiValueMap<String, HttpEntity<?>> getMultipartBody(List<Part> mapper) {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
mapper.stream().forEach(s -> {
if (s instanceof FilePart) {
builder.asyncPart(s.name(), ((FilePart) s).content(), DataBuffer.class)
.filename(((FilePart) s).filename());
} else if (s instanceof FormFieldPart) {
builder.asyncPart(s.name(), ((FormFieldPart) s).content(), DataBuffer.class);
}
});
return builder.build();
}
Above code works fine as long as I have not enabled streaming for multipart. What am I missing here ?
Comment From: poutsma
The Javadoc of the DefaultPartHttpMessageReader.streaming
property indicates that
with streaming enabled, the Flux
that is produced by this message reader must be consumed in the original order, i.e. the order of the HTTP message. Additionally, the body contents must either be completely consumed or canceled before moving to the next part.
This means that you cannot call collectList
, as that will collect all Parts
instances before consuming the content.
Comment From: jweavers
@poutsma Thanks for looking into this issue. Could you suggest the missing part as I tried other option, but I don't seems to find a way.
serverRequest.body(BodyExtractors.toParts()).map(m->m.content()).flatMap(mapper -> {
or
serverRequest.body(BodyExtractors.toParts()).buffer().flatMap(mapper -> {
I tried following as well, but it making same number of backend call as the number of files sent over request. For example, if multipart has two part, it making two separate backend call.
serverRequest.multipartData().flatMap(mapper -> {
Or is it not possible with multiple part/files ?
Comment From: poutsma
The truth of the matter is that the current streaming mode of the DefaultPartHttpMessageReader
has many restrictions, and only a few operators of the resulting Flux<Part>
can be used safely. That's why in version 6.0 of Spring Framework, we are implementing an alternative way of streaming multipart data. See #28006 for more details.