Affects: Spring 6.0.0-RC2

I tried to taste the new PartEvent feature in Spring 6, and created an example project.

There is a /partevents in FileUploadController to use the newest PartEvent handling uploading form.

I tried to return the field value of name and filename of file in the Http response.

@PostMapping("partevents")
  public ResponseEntity<Flux<Object>> handlePartsEvents(@RequestBody Flux<PartEvent> allPartsEvents) {
      var result = allPartsEvents
              .windowUntil(PartEvent::isLast)
              .concatMap(p -> {
                          log.debug("contactMap boundary::");
                          return p.switchOnFirst((signal, partEvents) -> {
                                      if (signal.hasValue()) {
                                          PartEvent event = signal.get();
                                          if (event instanceof FormPartEvent formEvent) {
                                              String value = formEvent.value();
                                              // handle form field
                                              log.debug("form value: {}", value);
                                              return Mono.just(value);

                                          } else if (event instanceof FilePartEvent fileEvent) {
                                              String filename = fileEvent.filename();
                                              log.debug("upload file name:{}", filename);

                                              return Mono.just(filename);
                                          }

                                          // no signal value
                                          return Mono.error(new RuntimeException("Unexpected event: " + event));

                                      }

                                      log.debug("return default flux");
                                      return partEvents; // either complete or error signal
                                  }
                          );
                      }
              );

      return ok().body(result);
  }

And I have created an integration test using WebClient to verify it, but test failed.

@Test
public void testPartEvents() throws Exception {
    this.client
            .post().uri("/partevents")
            .contentType(MULTIPART_FORM_DATA)
            .body(
                    Flux.concat(
                            FormPartEvent.create("name", "test"),
                            FilePartEvent.create("file", new ClassPathResource("spring.png"))
                    ),
                    PartEvent.class
            )
            .exchangeToFlux(clientResponse -> {
                        assertThat(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
                        return clientResponse.bodyToFlux(String.class);
                    }
            )
            .as(StepVerifier::create)
//                .consumeNextWith(it-> assertThat(it).isEqualTo("test"))
//                .consumeNextWith(it ->assertThat(it).isEqualTo("spring.png"))
            .expectNextCount(2)
            .verifyComplete();
}

The complete error stack can be found here, https://github.com/hantsy/spring6-sandbox/actions/runs/3350695924/jobs/5551671488#step:4:9421

But the controller unit test using WebTestClient is got passed. I've tried to use curl to send request to the running application, it was working as expected.

Comment From: poutsma

Thanks for checking out this new API! There are a couple of things causing the failure.

  • You are using Reactor Netty 2.0.0-M2, which is still in the milestone phase. Changing that to 1.0.24 got me a lot further.
  • handlePartsEvents has a bug. The PartEvent Javadoc says that

File uploads will produce one or more FilePartEvents, ...

in your code, you're only handling the first event, not any subsequent ones. At the very least, you have to dispose the contents of the following events, otherwise you'll end up with stale connections. So do something like:

return partEvents.map(PartEvent::content)
    .map(DataBufferUtils::release)
    .then(Mono.just(filename));
  • There was a bug in Spring Framework in the way PartEvents were written, causing issues with the JdkClientHttpConnector. None of the other client connectors had this problem. This bug has now been fixed.

Comment From: hantsy

@poutsma My example is copied from the Spring official doc, check PartEvent.

in your code, you're only handling the first event, not any subsequent ones.

The windowsUntil will split the original flux into a Flux<Flux<PartEvent>> by PartEvent.isLast method, every PartEvent(I have checked the FormPartEvent and FilePartEvent) has a isLast method, that ensure the original FormPartEvent and FilePartEvent is split into standalone Fluxs correctly.

Finally uses a contactMap to collect the result(form value and file name) into the result Flux.

Comment From: poutsma

@poutsma My example is copied from the Spring official blogs.

Indeed. You might want to check the author of said documentation (and the PartEvent API) before you ignore their suggestion.

In the PartEvent Javadoc it says:

NOTE that the body contents must be completely consumed, relayed, or released to avoid memory leaks.

In your sample, you handling not handling the body content. The code snippet I gave above fixes that by releasing it.

Comment From: hantsy

OK, updated my example.

  • use stable Reactor Netty 1
  • use Reactor Http Connector instead in the WebClient in IntegrationTests.
  • Update the file part handling as you suggested(adding consume and release data).

The IntegrationTests still failed with messages like.

https://github.com/hantsy/spring6-sandbox/actions/runs/3371019674/jobs/5592675195#step:4:10358

Decoded "["test","spring.png"]"
Error:  Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.685 s <<< FAILURE! - in com.example.demo.IntegrationTests
Error:  com.example.demo.IntegrationTests.testPartEvents  Time elapsed: 0.501 s  <<< FAILURE!
java.lang.AssertionError: expectation "expectNextCount(2)" failed (expected: count = 2; actual: counted = 1; signal: onComplete())

Comment From: poutsma

That failure is because the response is encoded as a JSON list. Spring MVC picked JSON as the response type, because the return type of handlePartsEvents is ResponseEntity<Flux<Object>>, which does not give sufficient information. If you change it to ResponseEntity<Flux<String>>, the response will be encode to the string testspring.png, which still fails the test. To make the test succeed, you will need to make sure that the response strings are decoded into separate events, and you do that by adding a newline delimiter. So return Mono.just(value + "\n") instead of return Mono.just(value), and similarly for the FilePartEvent. With those changes, the test succeeds for me.