Expected Behavior

I would like to suggest externalizing the authExceptionRetryInterval property used in the org.springframework.kafka.listener.ConsumerProperties class, allowing its configuration through application.properties.

Something like: spring.kafka.listener.auth-exception-retry-interval=5000

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Current Behavior

Currently, this configuration must be implemented manually in ConcurrentKafkaListenerContainerFactory and only supports a fixed retry interval.

Example:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);        
        factory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMillis(4000L));       

        return factory;
    }

Context

This way, we would avoid the need to implement code to activate this mechanism, which provides resilience for consumer applications.

Comment From: artembilan

The spring.kafka properties has nothing to do with this project. That is Spring Boot feature: https://docs.spring.io/spring-boot/reference/messaging/kafka.html We will transfer this over there, respectively.

Comment From: wilkinsona

A property for authExceptionRetryInterval has been added recently in https://github.com/spring-projects/spring-boot/issues/44199.

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Can you please provide an example of what you have in mind here?

Comment From: artembilan

We already have publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason)); which might give you an opportunity to deal with those error in retry manner.

Comment From: leonardowestphal

A property for authExceptionRetryInterval has been added recently in #44199.

Nice, thanks!

It would also be interesting if there were an option to configure a backoff policy or even allow the implementation of a custom handler for fatal exceptions like AuthenticationException.

Can you please provide an example of what you have in mind here?

I’m not sure if my understanding is 100% correct, so I ask you to check whether what I’m suggesting makes sense or not.

From my analysis, the handling of fatal authentication errors occurs in the run() method of KafkaMessageListenerContainer.ListenerConsumer, as shown in the code below. However, the handling of errors during the processing of a record happens at a different moment, and all handling mechanisms for this type of error only work for non-fatal errors.

catch (AuthorizationException | AuthenticationException ae) {
    if (this.authExceptionRetryInterval == null) {
        this.logger.error(ae, "Authentication/Authorization Exception and no authExceptionRetryInterval set");
        this.fatalError = true;
        exitThrowable = ae;
        break;
    }

    this.logger.error(ae, "Authentication/Authorization Exception, retrying in " + this.authExceptionRetryInterval.toMillis() + " ms");
    KafkaMessageListenerContainer.this.publishRetryAuthEvent(ae);
    failedAuthRetry = true;
    this.sleepFor(this.authExceptionRetryInterval);
}

It would be possible to implement what I have in mind in different ways, just as it is for other types of errors, but I will try to provide a general idea.

// Perhaps there could be something similar to the @ReRetryableTopic annotation, but specifically for fatal errors, allowing the definition of a dedicated retry policy.
// Maybe an attribute similar to include could be added to handle specific fatal errors.
// In my view, handling fatal errors such as authentication failures should be different from handling errors during record processing. That's why the idea was to create something specific and separate.
@KafkaListener(topics = "my-topic-a",   concurrency = "2")
@RetryableTopic(backoff = @Backoff(delay = 5000, multiplier = 2, maxDelay = 50000), attempts = "3", include = AuthenticationException.class)
public void test(ConsumerRecord<Integer, String> record) throws InterruptedException{
    //processing logic...
}

// Another approach would be to allow the creation of a specific ErrorHandler for fatal exceptions (e.g. DefaultFatalErrorHandler), similar to DefaultErrorHandler. In it, we could also define our retry policy.
@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(4500, 5);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        //processing logic...
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(AuthenticationException.class);

    return errorHandler;
}

I emphasize that if my understanding is incorrect, please let me know.

Thank you in advance.

Comment From: leonardowestphal

We already have publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason)); which might give you an opportunity to deal with those error in retry manner.

I don’t see how I could use this together with a listener created with the @KafkaListener annotation. I would have to reimplement my consumers in a different way! How did you envision its usage?

Comment From: artembilan

The AuthenticationException is a special connection error which has nothing to do with user consumer where indeed that retry and back-off logic is applied. Therefore we cannot apply similar configuration purpose on this kind of errors.

That ConsumerRetryAuthEvent has everything what you need in regards the mentioned @KafkaListener. The later one does produce a ListenerContainer object into the KafkaListenerEndpointRegistry. Therefore you can determine by that event what exactly @KafkaListener is failing to connect due to auth error. So, you can simply stop it from your @EventListener.

Either way this looks like totally different story not related to the original request for the property to expose.

Please, confirm and we can close this as a duplicated of the mentioned one with a desired fix.

Comment From: leonardowestphal

You can close this. My mistake was assuming that the property was linked to spring-kafka, which caused the topics to get mixed up... my bad.

My idea was to implement a more customized authentication retry to prevent the container from stopping and needing a restart. But for now, the retry using AuthExceptionRetryInterval meets my needs. I will take a closer look at this ConsumerRetryAuthEvent issue. Thanks!