We are in the situation where we have both consumers and producers in the code that all use avro, and using config only (in YAML files) worked fine for this.
But now we have a single producer where the topic's consumer expect JSON objects serialized as strings, and we would like to be able to send messages to this topic without throwing out everything we have set up for avro to the other topics we communicate on.
Ideally we would have like to be able to use config to e.g. set a different serializer for a particular record type (key/message type combination) or topic or something. But that's not possible to do today.
So we have to configure producers in code.
But we still would prefer not to configure everything (bootstrap server, schema registry, the SSL config) in code. We would like to use as much as possible of the existing config for this.
Googling found us this stack overflow answer to someone wanting to do the same thing back in 2018 https://stackoverflow.com/a/50310843
But the API used in this example has been deprecated and removed, so we have googled on and landed on https://github.com/spring-projects/spring-boot/issues/39144
So our wish is basically: keep all config but be able to set custom serializers (and possibly deserializers, but we don't need that (yet))
Comment From: philwebb
@steinarb Can you provide a snippet of code that shows the kinds of customizations you want to apply. I'm afraid we're not all that familiar with Kafka on the Boot team.
I assume you're taking about the spring.kafka.producer.key-serializer property.
It's a bit clunky, but I wonder if something like this would work:
@Bean
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(KafkaConnectionDetails connectionDetails,
ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildConsumerProperties(sslBundles.getIfAvailable());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
Comment From: quaff
It would be nice if serializer/deserializer could be configured per topic to override default one.
Comment From: Drezir
I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.
Comment From: steinarb
@philwebb I've been distracted elsewhere and didn't look closely at your example but it looks very interesting!
Some issues on my side:
- I need this for serializer rather than deserializer (but I believe I'll be able to translate that bit 😄 )
- I don't see clearly how to customize the serializer property for a particular serializer key/value combination
- I'm not sure how the <?, ?> generic type expression will translate to kotlin? (I'm working on a spring boot application translated from java to kotlin (not by me, happened before my time))
But I will try, and write back here what I find.
Comment From: steinarb
@philwebb The full example translated from deserializer to serializer and translated to kotlin (sorry about that!) and an added KafkaProperties to the @Bean method:
@Configuration
class KafkaSerializerConfig {
@Bean
fun kafkaProducerFactory(
connectionDetails: KafkaConnectionDetails?,
sslBundles: ObjectProvider<SslBundles?>,
kafkaProperties: KafkaProperties
): DefaultKafkaProducerFactory<*, *> {
val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory<Any, Any>(properties)
}
}
This works in setting the serializer to be StringSerializer. The problem is that it works for all producers (basically I'm replacing the serializer set in the YAML files).
What I tried was:
- Replace the wildcard types in the above code with String, String: then Spring was unable to create producers for the avro types at all
- Copy the above code for one copy for each combination of key and value (key is always String) and change the method name to avoid conflict: Spring still failed in startup (but what the actual error was, was hard to see)
If the topic had been present in the KafkaProperties above then I could have decided to switch Serializer only for one particular topic.
All ideas and suggestions are welcome!
Here is the kafka section of application.yaml (the variables referred to, comes from vault):
kafka:
security:
protocol: "SSL"
ssl:
trust-store-location: file:${user.home}/projects/myapp/certs/ca-trust.jks
trust-store-password: ${truststore_certificate_pw}
key-store-location: file:${user.home}/projects/myapp/certs/keystore.p12
key-store-password: ${keystore_certificate_pw}
bootstrapServers: ${bootstrap_servers}
groupId: consumer.local
topic:
transaction: ${transaction_topic}
transactionResult: ${transaction_result_topic}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema:
registry:
url: ${schema_registry_url}
basic:
auth:
credentials:
source: USER_INFO
user:
info: ${user_credentials}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema:
registry:
url: ${schema_registry_url}
basic:
auth:
credentials:
source: USER_INFO
user:
info: ${user_credentials}
properties:
specific:
avro:
reader: true
Comment From: steinarb
I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.
Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄
Comment From: steinarb
I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.
Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄
In fact: I think maybe multiple ProducerFactory (producer, not consumer, in my case) is what I tried to do, but didn't succeeed at....?
Comment From: steinarb
Ok, I've now had one step forward and one step back.
I got this error message from the spring tests that failed to start:
Unsatisfied dependency expressed through constructor parameter 9: ... Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>
"Constructor parameter 9" looked like this:
@Autowired private val kafkaTemplate: KafkaTemplate<String, SpecificRecord>,
("SpecificRecord" is the base class of all types generated from avro schemas)
At this point in time, the KafkaSerializerConfig class looked like this (i.e. KafkaSerializerConfig.specificRecordKafkaProducerFactory returns a KafkaProducerFactory
@Configuration
class KafkaSerializerConfig {
@Bean
fun specificRecordKafkaProducerFactory(
connectionDetails: KafkaConnectionDetails?,
sslBundles: ObjectProvider<SslBundles?>,
kafkaProperties: KafkaProperties
): DefaultKafkaProducerFactory<String, SpecificRecord> {
val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
return DefaultKafkaProducerFactory<String, SpecificRecord>(properties)
}
@Bean
fun jsonKafkaProducerFactory(
connectionDetails: KafkaConnectionDetails?,
sslBundles: ObjectProvider<SslBundles?>,
kafkaProperties: KafkaProperties
): DefaultKafkaProducerFactory<String, String> {
val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory<String, String>(properties)
}
}
When I put breakpoints in KafkaSerializerConfig.specificRecordKafkaProducerFactory and KafkaSerializerConfig.jsonKafkaProducerFactory when running tests, the breakpoints were never called.
So I googled the error message
No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>'
and found this: https://stackoverflow.com/questions/72071783/no-qualifying-bean-of-type-org-springframework-kafka-core-producerfactoryjava
So I tried adding @RefreshScope on the method/functions, but that had no effect.
So I added @RefreshScope on the class level, i.e.
@Configuration
@RefreshScope
class KafkaSerializerConfig {
...
Then I thought I had success: 1. Both functions of KafkaSerializerConfig were called once when running a spring test (which fires up much of the application) 2. The tests ran green
However when looking at it closer, I always get the avro serializer, never the StringSerializer (the test I was running ran green because it only uses avro. It was failing because it couldn't get a KafkaTemplate with the expected signature injected.
So now I'm back to square one.
All suggestions are welcome!
Comment From: steinarb
Unless I'm fooling myself I now have StringSerializer on the topic with key String and value String and the default AvroSerializer for the rest.
My KafkaSerializerConfig class now looks like this:
@Configuration
@RefreshScope
class KafkaSerializerConfig {
@Bean
fun jsonKafkaProducerFactory(
connectionDetails: KafkaConnectionDetails?,
sslBundles: ObjectProvider<SslBundles?>,
kafkaProperties: KafkaProperties
): DefaultKafkaProducerFactory<String, String> {
val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory<String, String>(properties)
}
@Bean
fun jsonKafkaTemplate(
producerFactoryWithStringSerializer: ProducerFactory<String, String>
): KafkaTemplate<String, String> {
val props = producerFactoryWithStringSerializer.configurationProperties
return KafkaTemplate(producerFactoryWithStringSerializer)
}
}
I.e.
1. I only have @Bean overrides for <String,String> (all other key/value combos get the default defined in application.yaml, which is AvroSerializer)
2. I override both KafkaProducerFactory<String,String> (to set the serializer) and KafkaTemplate<String,String> (if I don't have this override, then the <String,String> producer gets AvroSerializer)
3. The @RefreshScope annotation must be present for this to work (if it isn't present spring startup fails because it can't create and inject KafkaTemplate<?,?> for other than <String,String>)
I note that this is similar to the suggested solution here (except I use config instead of setting the serializer with code and I don't do anything in the explicit KafkaTemplate
Comment From: rajadilipkolli
I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.
Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄
In fact: I think maybe multiple ProducerFactory (producer, not consumer, in my case) is what I tried to do, but didn't succeeed at....?
Perhaps you are looking at something like this sample https://github.com/rajadilipkolli/kafka-experiments/tree/main/kafka-spring-boot/boot-multiple-producers-consumers
Comment From: thecooldrop
Okay, so I think I got the gist of the issue, and I would like to work on it.
What I understood is that @steinarb wants to be able to say per topic, which key-serializer and value-serializer should be used. For example it should be possible to do something like following configuration:
kafka:
topic:
transaction: ${transaction_topic}
transactionResult: ${transaction_result_topic}
producer:
default-key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
default-value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
topic:
- name: transaction
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema:
registry:
url: ${schema_registry_url}
basic:
auth:
credentials:
source: USER_INFO
user:
info: ${user_credentials}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema:
registry:
url: ${schema_registry_url}
basic:
auth:
credentials:
source: USER_INFO
user:
info: ${user_credentials}
This should then configure the application so that the producer sending to topic transaction uses the serializers configured in kafka.producer.topic, and if topic is not named in kafka.producer.topic, then default serializers defined in kafka.producer.default-*-serializer are used.
Is my understanding correct @steinarb ?
In other news, this sounds like implementing a custom Kafka factories for consumers and producers, which are currently located in in core Spring Framework. Is additional design/planning discussion needed here @philwebb ? I would be thrilled to take part in these discussions.