In previous versions of Spring-Boot there was an inbuild health indicator for Kafka, however somewhere along the way it was lost.

Refs: * https://github.com/spring-projects/spring-boot/pull/11515 * KafkaHealthIndicator

Please add the HealthIndicator for Kafka again and add metrics as well. This can be achieved using the following code:

(includes both metrics and health)

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin admin;

    @Autowired
    private MeterRegistry meterRegistry;

    @Autowired
    private Map<String, KafkaTemplate<?, ?>> kafkaTemplates;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(admin.getConfig());
    }

    @SuppressWarnings("deprecation") // Can be avoided by relying on Double.NaN for non doubles.
    @PostConstruct
    private void initMetrics() {
        final String kafkaPrefix = "kafka.";
        for (Entry<String, KafkaTemplate<?, ?>> templateEntry : kafkaTemplates.entrySet()) {
            final String name = templateEntry.getKey();
            final KafkaTemplate<?, ?> kafkaTemplate = templateEntry.getValue();
            for (Metric metric : kafkaTemplate.metrics().values()) {
                final MetricName metricName = metric.metricName();
                final Builder<Metric> gaugeBuilder = Gauge
                        .builder(kafkaPrefix + metricName.name(), metric, Metric::value) // <-- Here
                        .description(metricName.description());
                for (Entry<String, String> tagEntry : metricName.tags().entrySet()) {
                    gaugeBuilder.tag(kafkaPrefix + tagEntry.getKey(), tagEntry.getValue());
                }
                gaugeBuilder.tag("bean", name);
                gaugeBuilder.register(meterRegistry);
            }
        }
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator() {
        final DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(1000);
        final AdminClient adminClient = kafkaAdminClient();
        return () -> {
            final DescribeClusterResult describeCluster = adminClient.describeCluster(describeClusterOptions);
            try {
                final String clusterId = describeCluster.clusterId().get();
                final int nodeCount = describeCluster.nodes().get().size();
                return Health.up()
                        .withDetail("clusterId", clusterId)
                        .withDetail("nodeCount", nodeCount)
                        .build();
            } catch (InterruptedException | ExecutionException e) {
                return Health.down()
                        .withException(e)
                        .build();
            }
        };

    }

}

Feel free to use or modify the code as you see fit.

Comment From: snicoll

However somewhere along the way it was lost.

It wasn't lost, It was reverted for the reason exposed in #12225. If you have something that address the concern expressed there, I am more than happy to hear from you. Thanks for sharing but a piece of code with no tests is not something we can use.

As for the metrics support, this is unrelated and we don't deal with several topics in a single issue. There is already an issue in the micrometers project that you could subscribe to.

Comment From: ST-DDT

Thanks for sharing but a piece of code with no tests is not something we can use.

Thats why its a feature request and not a pull request. The code is just an example that might help someone, who knows the internals of Spring, but not the internals of Kafka, to implement this feature. (Or as a snippet to copy for anybody else who wants to use it)

As for the metrics support, this is unrelated and we don't deal with several topics in a single issue.

Sorry. I thought both of them would be monitoring, but I'll use separate issues for that in the future.

Comment From: snicoll

As I've already indicated we've tried to implement it already. See #12225 and the reasons why it got reverted. If you can help in that area we're most certainly interested.

Comment From: spring-projects-issues

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

Comment From: ST-DDT

I currently cannot help you with that.

Comment From: MartinX3

That's my personal solution. If the kafka bus is down (standard timeout 60 seconds) it will show the kafka as "down" at the actuator health endpoint.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Component
public class KafkaHealthIndicator implements HealthIndicator {
    private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

    private KafkaTemplate<String, String> kafka;

    public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
        this.kafka = kafka;
    }

    /**
     * Return an indication of health.
     *
     * @return the health for
     */
    @Override
    public Health health() {
        try {
            kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return Health.down(e).build();
        }
        return Health.up().build();
    }
}

Comment From: vspiliopoulos

In my opinion, Kafka health status should not be under /actuator/health, but under actuator/info, or at least there should be the option for the client to select where to place it. The reason is that microservices usually use /health endpoint status (UP/ DOWN) to scale up or down the microservice itself. Kafka broker being healthy or not is not a reason to scale up or down.

Comment From: philwebb

@vspiliopoulos We'd rather not conflate Health and Info endpoints, but you might want to track #14022 which aims to address the use-case you describe.

Comment From: zacyang

I have been looking into some out-of-box solution for health indicator for Kafka.

It worth to notice that @MartinX3's solution can only provide connectivity health check while @ST-DDT 's solution can provide connectivity health check and some meta info of the cluster.

It would have to combine acks min.insync.replicas and nodeCount to give a meaningful health indicator, (as it how Kafka consider a message is committed). Otherwise, we could end up with health up but any message sent would end up with a failure.

Comment From: otavioprado

Any update about it?

Comment From: edgardrp

I've seen some examples of the /health endpoint in which it seems there's a "broker" component, it would be terrific if that is filled with something like @MartinX3 or @ST-DDT have pointed out. Is there any update on this?

Comment From: onobc

@snicoll @wilkinsona I would ❤️ to pick this work back up.

At my company we have many many apps all implementing their own copy/pastad variant of health checks for Kafka and KafkaStreams. I was getting ready to write an internal starter but of course wanted to first see if we could add it back into Spring Boot.

I came across this issue (and the others related to the revert) and understand the reason for reverting. What are your thoughts about this in 2021? 😸

Comment From: wilkinsona

I'm afraid I don't know enough about Kafka and assessing its health to know for certain what we should do here.

Comment From: onobc

Yeh, It is really unfortunate that Kafka does not provide a mechanism to check health or at least give an opinion on a recommended approach.

Looking back through the revert ticket it seems like w/ the opt-in repl factor check and the updates to the admin client that it was really close to being usable. It also seems that you and @snicoll came across another roadblock that made a case for reverting. I know there are 1001 other things to be dealing w/ so I'm not trying to re-hash the past - but rather understand the limitations and see if there is something that would make sense out of the box.

I am not super familiar w/ Cassandra but I see in the CassandraDriverHealthIndicator that it is considered healthy if at least 1 node reports as "up". How is this case different?

I am curious to get @garyrussell opinion on what a "good" Kafka health indicator would be. He seems to dabble in Kafka from time to time ;)

Comment From: garyrussell

One of the stumbling blocks is when using transactions - the number of active brokers and in-sync replicas are broker configuration properties, which are not available on the client.

If an application is using transactions and there are not enough brokers to publish a record to a particular partition, the producer hangs until a timeout occurs. It is made more complicated because min.insync.replicas is also a topic-level configuration.

There are just too many of these corner cases to come up with a single robust health indicator.

Comment From: onobc

the number of active brokers and in-sync replicas are broker configuration properties, which are not available on the client.

It is made more complicated because min.insync.replicas is also a topic-level configuration.

Thanks for clarifying @garyrussell. I understand the concern in those other issues now.

Comment From: onobc

I promise not to turn this into a KafkaStreams thread discussion but am curious if there are hidden "stumbling blocks" for KafkaStreams in this area as well?

For KafkaStreams health indicators at my company we have been using the KafkaStreams instance (as returned from StreamsBuilderFactoryBean state, specifically KafkaStreams.state().isRunningOrRebalancing() and it has been working well for us. Is there some other underlying issue w/ this approach that I am not aware of that would make this not a good choice for a robust KafkaStreams health indicator?

Comment From: garyrussell

I you are using exactly once semantics, KafkaStreams will be affected by the insufficient in-sync replicas problem too.

Comment From: onobc

I you are using exactly once semantics, KafkaStreams will be affected by the insufficient in-sync replicas problem too.

Thanks for that info @garyrussell , good to know. We are not using that currently in our streams app.

It seems that "transaction" / "exactly once semantics" case is what adds a good deal of complexity to this (via need for in-sync replicas check). I wonder if it would make sense to add a simple health check that does not cover that case. Although, I don't think there is a good way to conditionally auto-configure that based on whether or not the app is configuring/using that feature of Kafka/KafkaStreams and it could be confusing to users if it works for all but that case.

Comment From: wilkinsona

Although, I don't think there is a good way to conditionally auto-configure that based on whether or not the app is configuring/using that feature of Kafka/KafkaStreams and it could be confusing to users if it works for all but that case.

I think that's the crux of this one and, as such, I don't think we should try to provide one. I think the risk of giving an inaccurate status is too high.

IMO, we should close this one. Let's see what the rest of the team thinks.

Comment From: onobc

After digging in more, I agree that there is not an easy way to provide a one-size-fits-all solution. Closing this ticket would probably solidify that decision.

If something is made available by Kafka in the future that makes this feasible, then a ticket can be created at that time.

Comment From: philwebb

+1 to closing. I'm going to go ahead and do that. Thanks for your efforts @bono007