Affects: \
I‘m using webclient, and I need to override the response header.
@RequestMapping("/token")
public Mono getToken() {
return wechatService.getWebAuthAccessToken()
.switchIfEmpty(Mono.error(new BizErrorException("xxxx failed")));
}
public Mono<WxGetAccessTokenResponse> getWebAuthAccessToken() {
commonWebClient.get()
.uri("xxxxxx")
.queryParam("xxx", xxxx)
.exchangeToMono(clientResponse -> clientResponse.mutate()
// override the content type
.headers(headers -> headers.remove(HttpHeaders.CONTENT_TYPE))
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
// copy the body as bytes with no processing
.body(clientResponse.body(BodyExtractors.toDataBuffers()))
.build()
.bodyToMono(WxGetAccessTokenResponse.class))
.onErrorReturn(xxx)
}
I got error: Only one connection receive subscriber allowed
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:183) ~[reactor-netty-core-1.0.7.jar:1.0.7]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Body from GET https://xxxxx [DefaultClientResponse]
|_ checkpoint ⇢ Body from GET https://xxxxx [DefaultClientResponse]
|_ checkpoint ⇢ Handler xxxx.controller.TokenController#getToken() [DispatcherHandler]
[DefaultWebFilterChain]
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "xxxxxx" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:183) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:173) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:415) ~[reactor-netty-http-1.0.7.jar:1.0.7]
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:202) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:458) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:629) ~[reactor-netty-http-1.0.7.jar:1.0.7]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at java.base/java.lang.Thread.run(Thread.java:836) ~[na:na]
But when I wrote code like this, everything works fine.
public Mono<WxGetAccessTokenResponse> getWebAuthAccessToken() {
commonWebClient.get()
.uri("xxxxxx")
.exchange() // deprecated
.flatMap(clientResponse -> ClientResponse.from(clientResponse) // deprecated
// override the content type
.headers(headers -> headers.remove(HttpHeaders.CONTENT_TYPE))
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
// copy the body as bytes with no processing
.body(clientResponse.body(BodyExtractors.toDataBuffers()))
.build()
.bodyToMono(WxGetAccessTokenResponse.class))
.onErrorReturn(xxx)
}
But the exchange()
and ClientResponse.from()
method are deprecated.
Comment From: poutsma
What is the original content-type that you are trying to replace? My guess is that it's a custom mime type that is not supported by default by the JSON decoders, and you're trying to change the content-type to make sure that the decoder does get called.
If this is the problem you're trying to solve, instead changing the Content-Type, you are probably better off setting up the decoders so that they do support the custom mime type. This stack overflow answer shows you how you can set up the Jackson2JsonDecoder
so that it supports a different mime type: https://stackoverflow.com/a/57046640
Comment From: rstoyanchev
When you use ClientResponse#mutate
, don't set the body unless you actually want to change it. If you do set it, we assume it's different content, and release the original body. So just remove the body
method.
Comment From: aboutZZ
When you use
ClientResponse#mutate
, don't set the body unless you actually want to change it. If you do set it, we assume it's different content, and release the original body. So just remove thebody
method.
Thanks a lot, by removing the body
method, it works!🎉