Expected Behavior
I would like to suggest externalizing the authExceptionRetryInterval property used in the org.springframework.kafka.listener.ConsumerProperties class, allowing its configuration through application.properties.
Something like: spring.kafka.listener.auth-exception-retry-interval=5000
It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.
Current Behavior
Currently, this configuration must be implemented manually in ConcurrentKafkaListenerContainerFactory and only supports a fixed retry interval.
Example:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMillis(4000L));
return factory;
}
Context
This way, we would avoid the need to implement code to activate this mechanism, which provides resilience for consumer applications.
Comment From: artembilan
The spring.kafka
properties has nothing to do with this project.
That is Spring Boot feature: https://docs.spring.io/spring-boot/reference/messaging/kafka.html
We will transfer this over there, respectively.
Comment From: wilkinsona
A property for authExceptionRetryInterval has been added recently in https://github.com/spring-projects/spring-boot/issues/44199.
It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.
Can you please provide an example of what you have in mind here?
Comment From: artembilan
We already have publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason));
which might give you an opportunity to deal with those error in retry manner.
Comment From: leonardowestphal
A property for authExceptionRetryInterval has been added recently in #44199.
Nice, thanks!
It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.
Can you please provide an example of what you have in mind here?
I’m not sure if my understanding is 100% correct, so I ask you to check whether what I’m suggesting makes sense or not.
From my analysis, the handling of fatal authentication errors occurs in the run() method of KafkaMessageListenerContainer
catch (AuthorizationException | AuthenticationException ae) {
if (this.authExceptionRetryInterval == null) {
this.logger.error(ae, "Authentication/Authorization Exception and no authExceptionRetryInterval set");
this.fatalError = true;
exitThrowable = ae;
break;
}
this.logger.error(ae, "Authentication/Authorization Exception, retrying in " + this.authExceptionRetryInterval.toMillis() + " ms");
KafkaMessageListenerContainer.this.publishRetryAuthEvent(ae);
failedAuthRetry = true;
this.sleepFor(this.authExceptionRetryInterval);
}
It would be possible to implement what I have in mind in different ways, just as it is for other types of errors, but I will try to provide a general idea.
// Perhaps there could be something similar to the @ReRetryableTopic annotation, but specifically for fatal errors, allowing the definition of a dedicated retry policy.
// Maybe an attribute similar to include could be added to handle specific fatal errors.
// In my view, handling fatal errors such as authentication failures should be different from handling errors during record processing. That's why the idea was to create something specific and separate.
@KafkaListener(topics = "my-topic-a", concurrency = "2")
@RetryableTopic(backoff = @Backoff(delay = 5000, multiplier = 2, maxDelay = 50000), attempts = "3", include = AuthenticationException.class)
public void test(ConsumerRecord<Integer, String> record) throws InterruptedException{
//processing logic...
}
// Another approach would be to allow the creation of a specific ErrorHandler for fatal exceptions (e.g. DefaultFatalErrorHandler), similar to DefaultErrorHandler. In it, we could also define our retry policy.
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(4500, 5);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
//processing logic...
}, fixedBackOff);
errorHandler.addRetryableExceptions(AuthenticationException.class);
return errorHandler;
}
I emphasize that if my understanding is incorrect, please let me know.
Thank you in advance.
Comment From: leonardowestphal
We already have
publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason));
which might give you an opportunity to deal with those error in retry manner.
I don’t see how I could use this together with a listener created with the @KafkaListener annotation. I would have to reimplement my consumers in a different way! How did you envision its usage?
Comment From: artembilan
The AuthenticationException
is a special connection error which has nothing to do with user consumer where indeed that retry and back-off logic is applied.
Therefore we cannot apply similar configuration purpose on this kind of errors.
That ConsumerRetryAuthEvent
has everything what you need in regards the mentioned @KafkaListener
.
The later one does produce a ListenerContainer
object into the KafkaListenerEndpointRegistry
.
Therefore you can determine by that event what exactly @KafkaListener
is failing to connect due to auth error.
So, you can simply stop it from your @EventListener
.
Either way this looks like totally different story not related to the original request for the property to expose.
Please, confirm and we can close this as a duplicated of the mentioned one with a desired fix.
Comment From: leonardowestphal
You can close this. My mistake was assuming that the property was linked to spring-kafka, which caused the topics to get mixed up... my bad.
My idea was to implement a more customized authentication retry to prevent the container from stopping and needing a restart. But for now, the retry using AuthExceptionRetryInterval meets my needs. I will take a closer look at this ConsumerRetryAuthEvent issue. Thanks!