When using the jetty-reactive-httpclient client connector with WebClient, whenever we call retry after WebClient.exchange (to retry the HTTP request), a CancellationException is thrown instead of the request being retried. When using reactor-netty client, the WebClient HTTP request is retried as expected.

It is unclear to us whether this is a Spring issue, or one with Reactor or Jetty. Please feel free to redirect us to the correct project.

Versions: Spring up to 5.1.7.RELEASE Spring Boot up to 2.1.4.RELEASE Jetty 9.4.18.v20190429 Reactor 3.2.8.RELEASE

See this sample repository for a full demonstration of the problem: https://github.com/scottjohnson/webclient-retry-repro

In part:

webClient.get()
                .uri("https://postman-echo.com/status/404")
                .exchange()
                .flatMap(cr -> {
                    if (!cr.statusCode().is2xxSuccessful()) {
                        throw new ResponseStatusException(HttpStatus.BAD_GATEWAY, "Didn't get a 200, retrying...");
                    }
                    else return cr.bodyToMono(String.class);
                })
                .retry(2);

We would expect this code to retry the requested URI two more times. This happens as expected when using the reactor-netty client.

When using jetty-reactive-http-client, the original request is made, but on retry instead of retrying the following exception is thrown:

java.util.concurrent.CancellationException: null
    at org.eclipse.jetty.reactive.client.internal.AbstractSinglePublisher.subscribe(AbstractSinglePublisher.java:54) ~[jetty-reactive-httpclient-1.0.3.jar:na]
    at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:43) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:109) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:93) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:122) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at org.eclipse.jetty.reactive.client.internal.AbstractSingleProcessor.downStreamOnNext(AbstractSingleProcessor.java:110) ~[jetty-reactive-httpclient-1.0.3.jar:na]
    at org.eclipse.jetty.reactive.client.internal.ResponseListenerPublisher.onNext(ResponseListenerPublisher.java:130) ~[jetty-reactive-httpclient-1.0.3.jar:na]
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at org.eclipse.jetty.reactive.client.internal.ResponseListenerPublisher.onHeaders(ResponseListenerPublisher.java:72) ~[jetty-reactive-httpclient-1.0.3.jar:na]
    at org.eclipse.jetty.client.ResponseNotifier.notifyHeaders(ResponseNotifier.java:98) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.ResponseNotifier.notifyHeaders(ResponseNotifier.java:90) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.HttpReceiver.responseHeaders(HttpReceiver.java:267) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.headerComplete(HttpReceiverOverHTTP.java:256) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.http.HttpParser.parseFields(HttpParser.java:1218) ~[jetty-http-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1502) ~[jetty-http-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:172) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:135) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:73) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:133) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:155) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:427) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:321) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:159) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]

Comment From: sdeleuze

It looks like a potential https://github.com/jetty-project/jetty-reactive-httpclient issue.

@scottjohnson Could you please try to reproduce using only jetty-reactive-httpclient API + Reactor retry operator?

Comment From: sbordet

@sdeleuze what I see analyzing this on the jetty-reactive-httpclient side is that the exception causes cancel() to be invoked on a subscription, and I don't see how a canceled subscription can be reused (I could not find any explicit behavior described by the reactivestream spec).

Isn't the current behavior violating Rule 2.12?

In this particular case, you want to send another request, so I'd say that you have to run again through the whole process of creating a request, which would re-create new publishers, subscribers and subscriptions so that they all are different objects than the previous request (and therefore you won't see the CancellationException).

In other words, what is the semantic of retry() in this case:

AtomicInteger unique = new AtomicInteger();
webClient.get()
    .uri("http://localhost/" + unique.incrementAndGet())
    .exchange()
    .flatMap(r -> {
        throw new ResponseStatusException(HttpStatus.OK);
    })
    .retry(1)

Is there a request to /1 and then a request to /2 for the retry?

If not, what if the first request is a POST that consumes a content that is not re-consumable (e.g. reads from a InputStream that provides the request content only once)? The retry won't be able to send the content a second time.

In general, a call to Jetty's ReactiveRequest.response(...) it's a once-only operation (per request instance) and whatever is passed to it or returned by it cannot be reused.

Thanks!

Comment From: scottjohnson

Thanks @sbordet for weighing in. This analysis matches with our observations that the cancelled subscription was being re-used after being put in a cancelled state.

@sdeleuze, we weren't able to effectively create a test on our end that reproduces using only the jetty-reactive-httpclient API; do you have any examples of component tests at a low level that I could adapt for this purpose?

Comment From: sbordet

@scottjohnson I have a reproducer, below instructions.

Be sure you have the dependency on spring-webflux:

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webflux</artifactId>
  <version>5.1.8.RELEASE</version>
  <scope>test</scope>
</dependency>

Then the test class with this test:

@Test
public void testRetry() throws Exception {
    // Setup the server here.

    AtomicInteger unique = new AtomicInteger();
    WebClient client = WebClient.builder().clientConnector(new JettyClientHttpConnector(httpClient())).build();
    client.get()
        .uri(uri() + "/" + unique.incrementAndGet())
        .exchange()
        .flatMap(r -> {
            throw new ResponseStatusException(HttpStatus.OK);
        })
        .retry(1)
        .block();
}

Comment From: scottjohnson

Hi, is there any progress on this issue?

Comment From: scottjohnson

Hi, just checking in if there is any progress on this issue? Thanks!

Comment From: rstoyanchev

It looks like this was fixed as a side effect of #22375. As a result of wrapping the exchange with Mono#defer now after cancellation, JettyClientHttpConnector is invoked again causing a new request to be created.

I confirmed that starting with 5.2 this is no longer an issue. Nevertheless I do want to follow up on the following:

@sdeleuze what I see analyzing this on the jetty-reactive-httpclient side is that the exception causes cancel() to be invoked on a subscription, and I don't see how a canceled subscription can be reused (I could not find any explicit behavior described by the reactivestream spec).

Isn't the current behavior violating Rule 2.12?

The subscription for the original subscriber is indeed cancelled. However on the retry AbstractSinglePublisher#subscribe is called with a new Subscriber and that can happen many times as per Rule 2.10.

For comparison, in Reactor Netty, a cancellation is interpreted as a request to close the request, while each additional Subscriber is a request for a new connection.

Comment From: scottjohnson

Thanks for all your efforts @rstoyanchev. Much appreciated!