Version: 2.3.0.RELEASE
I want to implement a RecordFilterStrategy, But did not take effect
@Component
public class RecordFilter implements RecordFilterStrategy{
@Override
public boolean filter(ConsumerRecord consumerRecord) {
// do something
return false;
}
}
Is it possible to modify org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory
like this
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy) { // new add
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
recordFilterStrategy.ifAvailable(factory::setRecordFilterStrategy); // new add
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}
other issues in spring-kafka project https://github.com/spring-projects/spring-kafka/issues/1535#issue-658024709
Comment From: wilkinsona
Yes, I think that makes sense. Thanks for the suggestion.
Comment From: snicoll
Closing in favour of PR #22973