Version: 2.5.1 Class: org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer
Kafka producer factory provides a closeable interface to close a Kafka producer (the same can be achieved with the close() method).
try (Producer producer = producerFactory().createProducer()) {
...
producer.flush();
}
However, a bug in the code prevents closing and removing a producer.
The close() method, in normal execution runs the following code (line 734):
this.closed = this.removeProducer.test(this, timeout);
The line of code calls a method to remove the current producer and return a 'success' indication to the closed property
However, the removeProducer() contains the following code:
if (producerToRemove.closed) {
// remove the producer
return true;
} else {
return false;
}
The bug seems to be in the if statement which should have been:
if (!producerToRemove.closed) {
The existing code will always return false from removeProducer() since from line 734 this.closed is always false
Comment From: snicoll
@ofirgrm thanks for the report but Spring Kafka is managed in a dedicated repo. Can you please move this to https://github.com/spring-projects/spring-kafka/ ?