version: spring boot 2.1.4.RELEASE

I use WebClient proxy the upstream , when server use non netty(undertow for example), cause java.lang.IllegalStateException: COMPLETED in high load balance.

java.lang.IllegalStateException: COMPLETED
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:419) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoCollect.subscribe(MonoCollect.java:66) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:146) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
    at reactor.netty.http.client.HttpClientConnect$HttpObserver.onStateChange(HttpClientConnect.java:425) ~[reactor-netty-0.8.6.RELEASE.jar:0.8.6.RELEASE]
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onStateChange(PooledConnectionProvider.java:511) ~[reactor-netty-0.8.6.RELEASE.jar:0.8.6.RELEASE]
    at reactor.netty.resources.PooledConnectionProvider$PooledConnection.onStateChange(PooledConnectionProvider.java:453) ~[reactor-netty-0.8.6.RELEASE.jar:0.8.6.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:112) ~[reactor-netty-0.8.6.RELEASE.jar:0.8.6.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:210) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:196) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:189) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:210) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:196) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:189) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:414) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:213) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:210) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:196) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:189) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1396) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:210) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:196) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:906) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:311) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:341) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:665) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) ~[netty-common-4.1.34.Final.jar:4.1.34.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]

here is the code:

@Controller
public class SimpleProxyController{

    WebClient webClient = WebClient.builder().build();

    @RequestMapping("proxy")
    public Mono<Void> proxy(ServerWebExchange exchange){
        Thread thread = Thread.currentThread();
        Flux<DataBuffer> content = webClient
                .method(exchange.getRequest().getMethod())
                .uri("http://localhost:8080/upstream")
                .body(exchange.getRequest().getBody(),DataBuffer.class)
                .exchange()
                .flatMapMany(resp -> resp.bodyToFlux(DataBuffer.class));
        content = content.publishOn(Schedulers.fromExecutor((Executor) thread));
        return exchange.getResponse().writeWith(content);
    }

    @RequestMapping("upstream")
    @ResponseBody
    public Flux<String> upstream(){
        return Flux.just("ok");
    }
}

code file issue-reproduce.tar.gz

reproduce step: - start the server - use wrk to send request. wrk -t 50 -c 5000 http://localhost:8080/proxy -d 1000 - wait a minue, the Exception occur.

I guest the Exception occur for reason that WebClient will retry on some IOException, cause the body(Flux) be subscribe again, but the STATE of AbstractListenerReadPublisher become COMPLETED by first subscribe. but how could I resolve the this?

Comment From: rstoyanchev

I guest the Exception occur for reason that WebClient will retry on some IOException,

I don't know what you mean. Maybe Reactor Netty has some retry logic when establishing the connection initially, but after that I'm not aware of any auto-reconnect logic. Nothing in WebClient for sure.

Generally writing to the response can only be done once so for any retry to work (let's say you wanted a retry) you'd have to insert it into the WebClient call before writing to the response. In other words it can retry if it fails to connect but once you start reading and writing to the client, the response is committed.

One side comment that may or may not be related:

Thread thread = Thread.currentThread();
...
content = content.publishOn(Schedulers.fromExecutor((Executor) thread));

Why do you need to do that? The response write is non-blocking. There should be no need to switch back to a different thread. I'm not sure it's okay to use the server's worker threads like that since the server itself is also managing those threads and using them for incoming requests. Maybe there is something in Undertow that makes it work but it shouldn't be necessary in the first place.

Comment From: gung-rgb

@rstoyanchev thank you for reply. First, the line:

content = content.publishOn(Schedulers.fromExecutor((Executor) thread));

is not relate to the Exception. the code here is just to avoid FixedLengthOverflowException,relate to https://github.com/spring-projects/spring-framework/issues/22690

Generally writing to the response can only be done once so for any retry to work (let's say you wanted a retry) you'd have to insert it into the WebClient call before writing to the response. In other words it can retry if it fails to connect but once you start reading and writing to the client, the response is committed

Yes. In fact the request body have not start reading and writing to the client for the client connect failed. But the request body have been register a subscriber, which cause state of AbstractListenerReadPublisher change from UNSUBSCRIBED to COMPLETED finally. so if the retry connect again trigger the method subscribe in enum COMPLETED, cause that IllegalStateException.

// the subscribe method in non UNSUBSCRIBED state would cause IllegalStateException
        <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
            throw new IllegalStateException(toString());
        }

    // but once subscribe at the first connect,state change from UNSUBSCRIBED to other, so 
  // subscribe twice is not allow.
        UNSUBSCRIBED {
            @Override
            <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
                Assert.notNull(publisher, "Publisher must not be null");
                Assert.notNull(subscriber, "Subscriber must not be null");
                if (publisher.changeState(this, SUBSCRIBING)) {
                    Subscription subscription = publisher.createSubscription();
                    publisher.subscriber = subscriber;
                    subscriber.onSubscribe(subscription);
                    publisher.changeState(SUBSCRIBING, NO_DEMAND);
                    // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
                    String logPrefix = publisher.getLogPrefix();
                    if (publisher.completionBeforeDemand) {
                        rsReadLogger.trace(logPrefix + "Completed before demand");
                        publisher.state.get().onAllDataRead(publisher);
                    }
                    Throwable ex = publisher.errorBeforeDemand;
                    if (ex != null) {
                        if (rsReadLogger.isTraceEnabled()) {
                            rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
                        }
                        publisher.state.get().onError(publisher, ex);
                    }
                }
                else {
                    throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
                            "subscriber: " + subscriber);
                }
            }

that may nothing wrong. but make me can not use reactor request body(server by undertow) for the body of webclient to be sent in such a proxy purpose. but reactor netty work well for this way for some reason, which confuse me.

Comment From: rstoyanchev

Yes. In fact the request body have not start reading and writing to the client for the client connect failed. But the request body have been register a subscriber

One thing that does not make sense is that any reconnect by Reactor Netty at the TCP level should be transparent to the WebClient. Further the writeWith in AbstractServerHttpResponse uses the ChannelSendOperator to defer the write until at least one item is fetched, or otherwise propagate the error so I'm not yet sure how such a reconnect explains what's going on.

Comment From: gung-rgb

Maybe you are right. Reconnection cause such issue is just my guess and I do not know what really happen in fact. But the Exception is really occur: Spring java.lang.IllegalStateException: COMPLETED occur with non netty for proxy with WebClient and the sender will be invoke twice when this Exception occur: Spring java.lang.IllegalStateException: COMPLETED occur with non netty for proxy with WebClient And I do not know how to avoid this exception. I am sure if you can the project and you can reproduce it by wrk.😊 issue-reproduce.tar.gz

Comment From: rstoyanchev

I can't reproduce the issue with the given sample and version. Also since the issue was created there has been a related fix #22467.

I'm closing for now, but feel to comment if it is still an issue.