Hello, I am using webflux webclient to call third party services. When the service is started, everything is normal, but after using it for a period of time, it will keep reporting an error, what is the reason for this, please help to answer, thank you. Error Log: ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION F: [0]java.lang.IllegalArgumentException: ExchangeFilterFunction must not be null 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [1] at org.springframework.util.Assert.notNull(Assert.java:201) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [2] at org.springframework.web.reactive.function.client.ExchangeFilterFunction.andThen(ExchangeFilterFunction.java:55) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [3] at java.util.stream.ReduceOps$2ReducingSink.accept(ReduceOps.java:123) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [4] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [5] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [6] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [7] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [8] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [9] at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:479) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [10] at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:263) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [11] at com..gateway.webflux.invoke.rest.WebClientRestHandler.invokeRest(WebClientRestHandler.java:78) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [12] at com.gateway.webflux.invoke.handler.DynamicProxyBeanFactory.invoke(DynamicProxyBeanFactory.java:48) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [13] at com.sun.proxy.$Proxy321.getWeather(Unknown Source) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [14] at com..platform.gateway.service.impl.WeatherServiceImpl.getWeather(WeatherServiceImpl.java:64) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [15] at com..gateway.controller.WeatherApi.getWeather$original$onPL55SP(WeatherApi.java:30) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [16] at com..gateway.controller.WeatherApi.getWeather$original$onPL55SP$accessor$tVP1x9qC(WeatherApi.java) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [17] at com.gateway.controller.WeatherApi$auxiliary$pWJcjex4.call(Unknown Source) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [18] at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION I: [19] at com.gateway.controller.WeatherApi.getWeather(WeatherApi.java) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [20] at sun.reflect.GeneratedMethodAccessor731.invoke(Unknown Source) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [21] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [22] at java.lang.reflect.Method.invoke(Method.java:498) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [23] at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:148) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [24] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [25] at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385) 16:24:33.043 [686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [26] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [27] at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:58) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [28] at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.withActiveSpan(TracingSubscriber.java:79) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [29] at io.opentelemetry.javaagent.shaded.instrumentation.reactor.TracingSubscriber.onNext(TracingSubscriber.java:58) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [30] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1812) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [31] at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) 16:24:33.043 [,686503ac14f14c46,9e06877a66820510,true] [XNIO-1 I/O-1] ERROR c.y.e.p.p.g.s.impl.WeatherServiceImpl - EXCEPTION : [32] at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329)
Comment From: bclozel
Thanks for getting in touch, but it feels like this is a question that would be better suited to Stack Overflow. As mentioned in the guidelines for contributing, we prefer to use the issue tracker only for bugs and enhancements. Feel free to update this issue with a link to the re-posted question (so that other people can find it) or add some more details if you feel this is a genuine bug.
Comment From: Zhupro
Code: @Configuration public class WebClientBuilder {
@Autowired
WebReactorLoadBalancerExchangeFilterFunction filterFunction;
@Autowired
WebClientConfigProperties webClientConfigProperties;
private static final Logger log = LoggerFactory
.getLogger(WebClientBuilder.class);
/**
* webclient 负载均衡
*
* @param
* @return
*/
@Bean(name = "fluxWebClient")
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
public WebClient.Builder builder() {
//连接池
// ConnectionProvider provider = ConnectionProvider.builder("order") // .maxConnections(100) // .maxIdleTime(Duration.ofSeconds(30)) // .pendingAcquireTimeout(Duration.ofMillis(100)) // .build(); TcpClient tcpClient = TcpClient.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConfigProperties.getConnectTime()) // 链接超时时长 .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(webClientConfigProperties.getReadTimeout())) // 读取数据超时时长 .addHandlerLast(new WriteTimeoutHandler(webClientConfigProperties.getReadTimeout()))); // 写数据超时时长
return WebClient.builder()
.filter(filterFunction) // webclient 负载均衡
// .filter(logRequest()) //请求拦截器 // .filter(logResponse()) //响应拦截器 .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))); }
@Configuration public class WebReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
private static final Logger log = LoggerFactory
.getLogger(WebReactorLoadBalancerExchangeFilterFunction.class);
private final LoadBalancerClientFactory loadBalancerFactory;
@Autowired
private Environment environment;
@Value("${properties.application.group:}")
private String service;
public WebReactorLoadBalancerExchangeFilterFunction(
LoadBalancerClientFactory loadBalancerFactory) {
this.loadBalancerFactory = loadBalancerFactory;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
URI originalUrl = request.url();
String host = originalUrl.getHost();
String serviceId = environment.getProperty("service-name." + host);
if (serviceId == null) {
String message = String.format("Request URI does not contain a valid hostname: %s", originalUrl.toString());
if (log.isWarnEnabled()) {
log.warn(message);
}
return Mono.just(ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build());
}
return choose(serviceId).flatMap(response -> {
ServiceInstance instance = response.getServer();
if (instance == null) {
String message = serviceInstanceUnavailableMessage(serviceId);
// if (log.isWarnEnabled()) { log.error(message); // } return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE) .body(serviceInstanceUnavailableMessage(serviceId)).build()); } if (log.isDebugEnabled()) { log.debug(String.format("Load balancer has retrieved the instance for service %s: %s", serviceId, instance.getUri())); } ClientRequest newRequest = buildClientRequest(request, reconstructURI(instance, originalUrl));
return next.exchange(newRequest).doOnEach((signal) -> {
//处理webclient结束 记录请求结束指标
if (!signal.isOnComplete()) {
Long startTime = WebfluxContext.getStartTime(signal.getContext());
ClientResponse clientResponse = signal.get();
Double requestTime = (double) (System.currentTimeMillis() - startTime) / 1000;
log.info("WebclientResponse: service:{} url:{} status:{} time:{}", service, newRequest.url(), clientResponse.rawStatusCode(), requestTime);
//指标
PrometheusConfig.API_SERVICE_HISTOGRAM.labels(service, newRequest.url().toString(), String.valueOf(clientResponse.rawStatusCode())).observe(requestTime);
}
}).subscriberContext(WebfluxContext::putStartTime);
});
}
protected URI reconstructURI(ServiceInstance instance, URI original) {
return LoadBalancerUriTools.reconstructURI(instance, original);
}
protected Mono<Response<ServiceInstance>> choose(String serviceId) {
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerFactory.getInstance(serviceId, ReactorLoadBalancer.class,
ServiceInstance.class);
if (loadBalancer == null) {
return Mono.just(new EmptyResponse());
}
return Mono.from(loadBalancer.choose());
}
private String serviceInstanceUnavailableMessage(String serviceId) {
return "负载均衡 Load balancer 找不到WebClient服务ID: " + serviceId;
}
private ClientRequest buildClientRequest(ClientRequest request, URI uri) {
return ClientRequest.create(request.method(), uri)
.headers(headers -> headers.addAll(request.headers()))
.cookies(cookies -> cookies.addAll(request.cookies()))
.attributes(attributes -> attributes.putAll(request.attributes()))
.body(request.body()).build();
}
}
public class WebClientRestHandler implements RestHandler {
private WebClient.Builder clientBuilder;
private FluxPraseClassBean fluxPraseClassBean;
WebClientConfigProperties webClientConfigProperties;
Logger log = LoggerFactory.getLogger(WebClientRestHandler.class);
/**
* 初始化
*
* @param fluxPraseClassBean
* @return
*/
@Override
public void init(FluxPraseClassBean fluxPraseClassBean) {
this.fluxPraseClassBean = fluxPraseClassBean;
//获取全局配置clientBuilder
this.clientBuilder = (WebClient.Builder) ApplicationContextProvider.getBean("fluxWebClient");
this.clientBuilder.baseUrl(fluxPraseClassBean.getServiceUrl());
this.webClientConfigProperties = ApplicationContextProvider.getBean(WebClientConfigProperties.class);
}
/**
* 反射调用方法
*
* @param fluxPraseMethodBean
* @return
*/
@Override
public Object invokeRest(FluxPraseMethodBean fluxPraseMethodBean) {
Object result = null;
log.info("WebClientRequest param:{}, body:{}", JSON.toJSONString(fluxPraseMethodBean.getParams()), JSON.toJSONString(fluxPraseMethodBean.getBody()));
StringBuilder sb = new StringBuilder();
if (StringUtils.isNotBlank(fluxPraseClassBean.getUrlPrefix())) {
sb.append(fluxPraseClassBean.getUrlPrefix());
}
if (StringUtils.isNotBlank(fluxPraseMethodBean.getUrl())) {
sb.append(fluxPraseMethodBean.getUrl());
}
//这里可以得到参数数组和方法等,可以通过反射,注解等,进行结果集的处理
WebClient.RequestBodyUriSpec requestBodyUriSpec = this.clientBuilder
//请求方法
.build().method(fluxPraseMethodBean.getRequestMethod());
//请求地址
WebClient.RequestBodySpec responseBodySpec = null;
if (fluxPraseMethodBean.getParams() != null) {
if (fluxPraseMethodBean.getIsForm()) {
// responseBodySpec = requestBodyUriSpec.uri( uriBuilder -> uriBuilder.queryParams(getRequestParamMap(fluxPraseMethodBean.getParams())).build()).contentType(MediaType.APPLICATION_JSON); responseBodySpec = requestBodyUriSpec.uri(sb.toString(), uriBuilder -> uriBuilder.queryParams(getRequestParamMap(fluxPraseMethodBean.getParams())).build()).contentType(MediaType.APPLICATION_JSON); } else { responseBodySpec = requestBodyUriSpec.uri(sb.toString(), fluxPraseMethodBean.getParams()).contentType(MediaType.APPLICATION_JSON); } } else { responseBodySpec = requestBodyUriSpec.uri(sb.toString()).contentType(MediaType.APPLICATION_JSON); } //接收格式 WebClient.ResponseSpec responseSpec = null; if (fluxPraseMethodBean.getBody() != null) { responseSpec = responseBodySpec.body(BodyInserters.fromObject(JSON.toJSONString(fluxPraseMethodBean.getBody())))
.accept(MediaType.APPLICATION_JSON)
//发出请求
.retrieve();
} else {
responseSpec = responseBodySpec.accept(MediaType.APPLICATION_JSON)
//发出请求
.retrieve();
}
//处理请求
if (fluxPraseMethodBean.getReturnFlux()) {
Flux flux = fluxHandle(responseSpec, fluxPraseMethodBean);
result = HystrixCommands
.from(flux)
.fallback(throwable -> {
HystrixUtils.checkHystrixStatus(fluxPraseMethodBean.getHystrixCommonName());
return flux;
})
.commandName(fluxPraseMethodBean.getHystrixCommonName())
.toFlux();
} else {
Mono mono = monoHandle(responseSpec, fluxPraseMethodBean);
result = HystrixCommands
.from(mono)
.fallback(throwable -> {
HystrixUtils.checkHystrixStatus(fluxPraseMethodBean.getHystrixCommonName());
return mono;
})
.commandName(fluxPraseMethodBean.getHystrixCommonName())
.toMono();
}
return result;
}
}