Description: When observability is enabled in a Spring Boot application using Spring Kafka, we are encountering an issue where the application is unable to obtain Kafka cluster information. This issue results in a TimeoutException during the clusterId call. Please note that we are able to send message with spring.kafka.template.observation-enabled: false but could not send message with spring.kafka.template.observation-enabled: true. The relevant portions of the stack trace are as follows:

Steps to Reproduce:

  1. Enable observability setting spring.kafka.template.observation-enabled to true
  2. Use Spring Kafka in a Spring Boot application version 3.2.0
  3. Send message using kafkaTemplate

Expected Behavior: The application should be able to send message by obtaining Kafka cluster information successfully even when observability is enabled.

Actual Behavior: The TimeoutException occurs during the clusterId call, and the application fails to obtain Kafka cluster information.

Environment:

Spring Boot version: 3.2.0 Kafka version: 3.4.0

logs

16:14:29.750 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Node -1 disconnected.
16:14:29.750 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 43 due to node -1 being disconnected (elapsed time since creation: 212ms, elapsed time since send: 212ms, request timeout: 3600000ms)
16:14:29.869 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
16:14:29.971 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- App info kafka.admin.client for adminclient-1 unregistered
16:14:29.971 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
16:14:29.971 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Timed out 1 remaining operation(s) during close.
16:14:29.973 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- Metrics scheduler closed
16:14:29.973 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- Closing reporter org.apache.kafka.common.metrics.JmxReporter
16:14:29.973 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- Metrics reporters closed
16:14:29.996 [http-nio-8085-exec-2] ERROR [peer-routing, traceId: 6571a1d1550947dbb1d77f8b3073c664, spanId: b1d77f8b3073c664] --- Could not obtain cluster info
java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
    at org.springframework.kafka.core.KafkaAdmin.clusterId(KafkaAdmin.java:335)
    at org.springframework.kafka.core.KafkaTemplate.clusterId(KafkaTemplate.java:504)
    at org.springframework.kafka.support.micrometer.KafkaRecordSenderContext.<init>(KafkaRecordSenderContext.java:45)
    at org.springframework.kafka.core.KafkaTemplate.lambda$observeSend$3(KafkaTemplate.java:752)
    at io.micrometer.observation.Observation.createNotStarted(Observation.java:172)
    at io.micrometer.observation.docs.ObservationDocumentation.observation(ObservationDocumentation.java:188)
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:750)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:565)
    at com.maersk.fbm.integration.controller.RoutingController.sendKafkaMessage(RoutingController.java:144)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:577)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:254)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:182)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:917)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:829)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:903)
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:564)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885)
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:340)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Thread.java:833)

Comment From: wilkinsona

Spring Boot isn't really involved at this level with the exception that you've reported being more closely related to Spring Kafka and Kafka itself.

I suspect that the exception may be due to a more general problem with your Kafka cluster and isn't specifically related to observability. The earlier log messages suggest as much:

16:14:29.750 [kafka-admin-client-thread | adminclient-1] INFO  [peer-routing, traceId: , spanId: ] --- [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 43 due to node -1 being disconnected (elapsed time since creation: 212ms, elapsed time since send: 212ms, request timeout: 3600000ms)

My suspicion is that if you called org.springframework.kafka.core.KafkaAdmin.clusterId() directly from within your application, it too would time out due to these problems with the cluster. Please check the health of your cluster and address the node disconnection. If problems remain after that, Stack Overflow or a Spring Kafka discussion are better places to seek help.

Comment From: rabellofernando

I've got the same problem, did you fix it?