Description: Spring webflux internals break during devtools restart when WebClient bean is customized with a custom client connector.
spring boot version: Tested on 2.1.7.RELEASE and 2.2.0.M5 with a fresh kotlin start.spring.io project:
Only added 2 files to what was produced by start.spring.io: Controller (to show failure case):
@Controller
class TestController(
private val webClient: WebClient
) {
@GetMapping("/test")
fun test(): Mono<String> {
val result = webClient.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve()
return result.bodyToMono(String::class.java)
}
}
Configuration:
@Configuration
class TestConfiguration {
@Bean
fun webClient(webClientBuilder: WebClient.Builder): WebClient {
val tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.doOnConnected {
it.addHandlerLast(ReadTimeoutHandler(60))
}
val connector = ReactorClientHttpConnector(HttpClient.from(tcpClient))
return webClientBuilder.clientConnector(connector).build()
}
}
Replication Steps:
1. Create the simple application as described above.
2. Hit localhost:8080/test
. The first time, you will see the data come in.
3. Change the url in the controller, and rebuild the project.
4. Hit localhost:8080/test
again. This time, there will be a netty error:
2019-08-13 11:21:03.925 ERROR 14689 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [68419476] 500 Server Error for HTTP GET "/test"
java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET https://postman-echo.com/get?a=322 [DefaultWebClient]
|_ checkpoint ⇢ Handler com.test.test.TestController#test() [DispatcherHandler]
|_ checkpoint ⇢ HTTP GET "/test" [ExceptionHandlingWebHandler]
Stack trace:
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:333) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:766) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:472) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:69) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:322) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:120) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:248) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4030) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3899) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3835) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:200) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:169) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:129) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:334) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:503) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$2(PooledConnectionProvider.java:166) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:332) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoRetryPredicate.subscribeOrReturn(MonoRetryPredicate.java:51) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:46) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:335) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:240) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2138) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:285) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:160) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:46) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2138) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1946) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1820) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:436) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:161) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) ~[netty-codec-4.1.38.Final.jar:4.1.38.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) ~[netty-codec-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
A temporary workaround is to not specify a custom client connector, but it'd be nice to be able to keep so we can hook in handlers, etc.
Thanks in advance!
Comment From: wilkinsona
Here's a single file that reproduces the problem:
package com.example.demo;
import java.net.URI;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
@SpringBootApplication
public class Gh17856Application {
public static void main(String[] args) {
SpringApplication.run(Gh17856Application.class, args);
}
@Bean
WebClient webClient(WebClient.Builder webClientBuilder) {
TcpClient tcpClient = TcpClient.create();
ClientHttpConnector connector = new ReactorClientHttpConnector(HttpClient.from(tcpClient));
return webClientBuilder.clientConnector(connector).build();
}
}
@RestController
class TestController {
private final WebClient webClient;
TestController(WebClient webClient) {
this.webClient = webClient;
}
@GetMapping("/test")
Mono<String> test() {
ResponseSpec result = webClient.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve();
return result.bodyToMono(String.class);
}
}
The SingleThreadEventExecutor
is rejecting the execute
call is doing so as it was shut down when the application context was closed for the DevTools-triggered restart. Closing the context calls ReactorResourceFactory.destroy()
which disposes of all the Reactor Netty resources. I've yet to figure out why the disposed executor it apparently still being used by the WebClient
that's built following the restart.
Comment From: wilkinsona
This is looking like a Reactor Netty problem to me. Here's an attempt at reproducing the problem without Boot's involvement:
package com.example.demo;
import java.net.URI;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
public class StandaloneReproduction {
public static void main(String[] args) {
perform();
perform();
}
private static void perform() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.afterPropertiesSet();
WebClient.Builder builder = WebClient.builder();
TcpClient tcpClient = TcpClient.create();
WebClient client = builder.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();
System.out.println(client.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve().bodyToMono(String.class).block());
factory.destroy();
}
}
With Reactor Netty 0.8, the second perform()
attempt fails with an NPE:
Exception in thread "main" java.lang.NullPointerException
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:69)
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:322)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:120)
at io.netty.channel.pool.SimpleChannelPool.connectChannel(SimpleChannelPool.java:263)
at io.netty.channel.pool.SimpleChannelPool.acquireHealthyFromPoolOrNew(SimpleChannelPool.java:175)
at io.netty.channel.pool.SimpleChannelPool.notifyHealthCheck(SimpleChannelPool.java:248)
at io.netty.channel.pool.SimpleChannelPool.doHealthCheck(SimpleChannelPool.java:223)
at io.netty.channel.pool.SimpleChannelPool.access$100(SimpleChannelPool.java:41)
at io.netty.channel.pool.SimpleChannelPool$3.run(SimpleChannelPool.java:195)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at com.example.demo.StandaloneReproduction.perform(StandaloneReproduction.java:26)
at com.example.demo.StandaloneReproduction.main(StandaloneReproduction.java:17)
With 0.9, the failure is closer to the one reported above:
Exception in thread "main" java.lang.IllegalStateException: executor not accepting a task
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:60)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET https://postman-echo.com/get?a=42 [DefaultWebClient]
Stack trace:
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:60)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:196)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:50)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:184)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:170)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:527)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:98)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:982)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:505)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:416)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:475)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1514)
at com.example.demo.StandaloneReproduction.perform(StandaloneReproduction.java:26)
at com.example.demo.StandaloneReproduction.main(StandaloneReproduction.java:17)
@violetagg I'm starting to get really out of my depth here. Can you please help me out and take a look?
Comment From: violetagg
@wilkinsona
The example is not quite correct.
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.afterPropertiesSet();
The code above creates the global HttpResources like the threads and the connection pool However this code below
TcpClient tcpClient = TcpClient.create();
WebClient client = builder.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();
Specifies * Create Tcp client with a connection pool from the global TcpResources and choose whatever you have for the threads. In pure TCP use case this will be the threads from the global TcpResources, but in HTTP use case this will be the threads from the global HttpResources * Create Http client from this Tcp client
So at the end you are running with a connection pool from the global TcpResources, but with threads from the global HttpResources
With the code below you destroy global HttpResources but not global TcpResources.
factory.destroy();
There are two possible solutions
Create Tcp client with global HttpResources
TcpClient tcpClient = TcpClient.create(factory.getConnectionProvider()).runOn(factory.getLoopResources());
Or destroy global TcpResources
TcpResources.disposeLoopsAndConnections();
Comment From: wilkinsona
Thanks very much, @violetagg.
Applying the advice to the original problem and aligning with the recommendation in the reference docs to use a bean to customise the client connector, results in a bean definition like the following:
@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
TcpClient tcpClient = TcpClient.create(resourceFactory.getConnectionProvider())
.runOn(resourceFactory.getLoopResources())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.doOnConnected((connection) -> connection.addHandlerLast(new ReadTimeoutHandler(60)));
return new ReactorClientHttpConnector(HttpClient.from(tcpClient));
}
In this particular case a WebClient
bean is also required so a second bean is defined for that as shown in the following example:
@Bean
WebClient webClient(WebClient.Builder builder) {
return builder.build();
}
The documentation already mentions the ReactorResourceFactory
and that, by default, it's used for server and client resources. I wonder if we should add something to tie things together a bit more and suggest/recommend injecting the ReactorResourceFactory
and using it as shown above when customising the client connector. What do you think, @bclozel?
Comment From: violetagg
One question: why do you create the HttpClient from a TcpClient and not directly with HttpClient.create()? The last will use by default the connection pool and the threads from the HttpResources and thus you do not need to specify them.
Comment From: wilkinsona
I'd assumed that it was the only way to configure a connect timeout and to add a read timeout handler. If that's possible via HttpClient
then it sounds like it would be a nice additional refinement.
Comment From: violetagg
you can do it like this
@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
HttpClient httpClient =
HttpClient.create()
.tcpConfiguration(tcpClient ->
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.doOnConnected((connection) ->
connection.addHandlerLast(new ReadTimeoutHandler(60))));
return new ReactorClientHttpConnector(httpClient);
}
Comment From: wilkinsona
Thanks, @violetagg. I've just realised that one downside to the above is that it's not using the injected resourceFactory
. If the ReactorResourceFactory
has been customized so that it doesn't use the global resources a problem similar to that originally described in this issue may occur.
Comment From: wilkinsona
We're going to add a how-to in the reference docs showing how to customise the TCP client while sharing resources between WebClient and server.
Comment From: yonhbu
you can do it like this
@Bean ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) { HttpClient httpClient = HttpClient.create() .tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000) .doOnConnected((connection) -> connection.addHandlerLast(new ReadTimeoutHandler(60)))); return new ReactorClientHttpConnector(httpClient); }
How can to do the unit testing and this methodo?