I want to configure my container to
- provide authentication
- enable observability for the listener
- obey the spring configuration e.g. spring.kafka.consumer.auto-offset-reset: earliest for testing with embedded kafka
I found this solution
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<Any, Any>,
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer
): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
configurer.configure(factory, consumerFactory)
factory.setContainerCustomizer {
it.containerProperties.isObservationEnabled = true
it.containerProperties.kafkaConsumerProperties = properites2()
}
return factory
}
So basically I just want to set the containerCustomizer But I think it could be achieved easier if the KafkaAnnotationDrivenConfiguration would take a ContainerCustomizer into account.
This should imho be sufficient to achieve the goal?
@Bean
fun containerCustomizer() = object: ContainerCustomizer<Any, Any, ConcurrentMessageListenerContainer<Any, Any>> {
override fun configure(container: ConcurrentMessageListenerContainer<Any, Any>) {
container.containerProperties.isObservationEnabled = true
container.containerProperties.kafkaConsumerProperties = properties()
}
}
I want to mess around with the factories as little as possible to not break the default configuration.
Comment From: mhalbritter
Superseded by #34033