Kotlin version: 1.5.30 Spring Boot version: 2.5.4 and 2.6.1 Spring Framework version: 5.3.9

When making multiple WebClient requests within a Kotlin coroutine, if one of them fails with a 4xx or 5xx status, there are two errors logged... an expected WebClientResponseException and an unexpected IllegalStateException: Only one connection receive subscriber allowed from Reactor. A unit test to reproduce the behavior along with the full stack trace are posted below.

There may be other ways to reproduce this error, but in the case below, it only happens if the request is expecting a body and the 4xx response does not contain one. Also, if you comment out the /path1 request, the /path2 request fails normally without the additional Reactor exception, even if the 4xx response doesn't contain a body.

Unit test (spring-cloud-contract-wiremock is used to mock the server):

@SpringBootTest
@AutoConfigureWireMock(port = 8089)
class WebClientTest {

    private lateinit var webClient: WebClient

    @BeforeEach
    fun setup() {
        webClient = WebClient.builder().baseUrl("http://localhost:8089").build()

        stubFor(
            get(urlPathEqualTo("/path1")).willReturn(
                aResponse().withStatus(200)
                    .withHeader("Content-Type", "application/json")
                    .withBody("""{"data":""}""")
            )
        )
        stubFor(
            get(urlPathEqualTo("/path2")).willReturn(
                aResponse().withStatus(400)
            )
        )
    }

    @Test
    fun test(): Unit = runBlocking {
        val result1 = async(Dispatchers.IO) {
            webClient.get().uri("/path1").retrieve().awaitBody<Response>()
        }
        val result2 = async(Dispatchers.IO) {
            webClient.get().uri("/path2").retrieve().awaitBody<Response>()
        }
        Pair(result1.await(), result2.await())
    }
}

data class Response(val data: String)

Stack trace:

2021-12-09 10:30:32.618 ERROR 55926 --- [ctor-http-nio-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182)
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:393)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
    at reactor.core.publisher.Operators.error(Operators.java:198)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165)
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167)
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160)
    at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654)
    at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:201)
    at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:457)
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:628)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)


org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from GET http://localhost:8089/path2

    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:196)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ 400 from GET http://localhost:8089/path2 [DefaultWebClient]
Stack trace:
        at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:196)
        at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:213)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onComplete(FluxDefaultIfEmpty.java:109)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:344)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:684)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Comment From: Garnerye

@spring-projects-issues

Comment From: nkonev

looks similar to https://github.com/spring-projects/spring-framework/issues/26699

Comment From: sdeleuze

Indeed, thanks for spotting this.