Version: 2.3.0.RELEASE

I want to implement a RecordFilterStrategy, But did not take effect

@Component
public class RecordFilter implements RecordFilterStrategy{

    @Override
    public boolean filter(ConsumerRecord consumerRecord) {
        // do something
        return false;
    }
}

Is it possible to modify org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory

like this

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
        ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy) { // new add
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    recordFilterStrategy.ifAvailable(factory::setRecordFilterStrategy); // new add
    configurer.configure(factory, kafkaConsumerFactory
            .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
    return factory;
}

other issues in spring-kafka project https://github.com/spring-projects/spring-kafka/issues/1535#issue-658024709

Comment From: wilkinsona

Yes, I think that makes sense. Thanks for the suggestion.

Comment From: snicoll

Closing in favour of PR #22973