We are in the situation where we have both consumers and producers in the code that all use avro, and using config only (in YAML files) worked fine for this.

But now we have a single producer where the topic's consumer expect JSON objects serialized as strings, and we would like to be able to send messages to this topic without throwing out everything we have set up for avro to the other topics we communicate on.

Ideally we would have like to be able to use config to e.g. set a different serializer for a particular record type (key/message type combination) or topic or something. But that's not possible to do today.

So we have to configure producers in code.

But we still would prefer not to configure everything (bootstrap server, schema registry, the SSL config) in code. We would like to use as much as possible of the existing config for this.

Googling found us this stack overflow answer to someone wanting to do the same thing back in 2018 https://stackoverflow.com/a/50310843

But the API used in this example has been deprecated and removed, so we have googled on and landed on https://github.com/spring-projects/spring-boot/issues/39144

So our wish is basically: keep all config but be able to set custom serializers (and possibly deserializers, but we don't need that (yet))

Comment From: philwebb

@steinarb Can you provide a snippet of code that shows the kinds of customizations you want to apply. I'm afraid we're not all that familiar with Kafka on the Boot team.

I assume you're taking about the spring.kafka.producer.key-serializer property.

It's a bit clunky, but I wonder if something like this would work:

@Bean
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(KafkaConnectionDetails connectionDetails,
        ObjectProvider<SslBundles> sslBundles) {
    Map<String, Object> properties = this.properties.buildConsumerProperties(sslBundles.getIfAvailable());
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class);
    return new DefaultKafkaConsumerFactory<>(properties);
}

Comment From: quaff

It would be nice if serializer/deserializer could be configured per topic to override default one.

Comment From: Drezir

I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.

Comment From: steinarb

@philwebb I've been distracted elsewhere and didn't look closely at your example but it looks very interesting!

Some issues on my side:

  1. I need this for serializer rather than deserializer (but I believe I'll be able to translate that bit 😄 )
  2. I don't see clearly how to customize the serializer property for a particular serializer key/value combination
  3. I'm not sure how the <?, ?> generic type expression will translate to kotlin? (I'm working on a spring boot application translated from java to kotlin (not by me, happened before my time))

But I will try, and write back here what I find.

Comment From: steinarb

@philwebb The full example translated from deserializer to serializer and translated to kotlin (sorry about that!) and an added KafkaProperties to the @Bean method:

@Configuration
class KafkaSerializerConfig {
    @Bean
    fun kafkaProducerFactory(
        connectionDetails: KafkaConnectionDetails?,
        sslBundles: ObjectProvider<SslBundles?>,
        kafkaProperties: KafkaProperties
    ): DefaultKafkaProducerFactory<*, *> {
        val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory<Any, Any>(properties)
    }
}

This works in setting the serializer to be StringSerializer. The problem is that it works for all producers (basically I'm replacing the serializer set in the YAML files).

What I tried was:

  1. Replace the wildcard types in the above code with String, String: then Spring was unable to create producers for the avro types at all
  2. Copy the above code for one copy for each combination of key and value (key is always String) and change the method name to avoid conflict: Spring still failed in startup (but what the actual error was, was hard to see)

If the topic had been present in the KafkaProperties above then I could have decided to switch Serializer only for one particular topic.

All ideas and suggestions are welcome!

Here is the kafka section of application.yaml (the variables referred to, comes from vault):

  kafka:
    security:
      protocol: "SSL"
    ssl:
      trust-store-location: file:${user.home}/projects/myapp/certs/ca-trust.jks
      trust-store-password: ${truststore_certificate_pw}
      key-store-location: file:${user.home}/projects/myapp/certs/keystore.p12
      key-store-password: ${keystore_certificate_pw}
    bootstrapServers: ${bootstrap_servers}
    groupId: consumer.local
    topic:
      transaction: ${transaction_topic}
      transactionResult: ${transaction_result_topic}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema:
          registry:
            url: ${schema_registry_url}
        basic:
          auth:
            credentials:
              source: USER_INFO
            user:
              info: ${user_credentials}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema:
          registry:
            url: ${schema_registry_url}
        basic:
          auth:
            credentials:
              source: USER_INFO
            user:
              info: ${user_credentials}
    properties:
      specific:
        avro:
          reader: true

Comment From: steinarb

I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.

Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄

Comment From: steinarb

I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.

Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄

In fact: I think maybe multiple ProducerFactory (producer, not consumer, in my case) is what I tried to do, but didn't succeeed at....?

Comment From: steinarb

Ok, I've now had one step forward and one step back.

I got this error message from the spring tests that failed to start:

Unsatisfied dependency expressed through constructor parameter 9: ... Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>

"Constructor parameter 9" looked like this:

    @Autowired private val kafkaTemplate: KafkaTemplate<String, SpecificRecord>,

("SpecificRecord" is the base class of all types generated from avro schemas)

At this point in time, the KafkaSerializerConfig class looked like this (i.e. KafkaSerializerConfig.specificRecordKafkaProducerFactory returns a KafkaProducerFactory):

@Configuration
class KafkaSerializerConfig {
    @Bean
    fun specificRecordKafkaProducerFactory(
        connectionDetails: KafkaConnectionDetails?,
        sslBundles: ObjectProvider<SslBundles?>,
        kafkaProperties: KafkaProperties
    ): DefaultKafkaProducerFactory<String, SpecificRecord> {
        val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
        return DefaultKafkaProducerFactory<String, SpecificRecord>(properties)
    }

    @Bean
    fun jsonKafkaProducerFactory(
        connectionDetails: KafkaConnectionDetails?,
        sslBundles: ObjectProvider<SslBundles?>,
        kafkaProperties: KafkaProperties
    ): DefaultKafkaProducerFactory<String, String> {
        val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory<String, String>(properties)
    }
}

When I put breakpoints in KafkaSerializerConfig.specificRecordKafkaProducerFactory and KafkaSerializerConfig.jsonKafkaProducerFactory when running tests, the breakpoints were never called.

So I googled the error message

No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>'

and found this: https://stackoverflow.com/questions/72071783/no-qualifying-bean-of-type-org-springframework-kafka-core-producerfactoryjava

So I tried adding @RefreshScope on the method/functions, but that had no effect.

So I added @RefreshScope on the class level, i.e.

@Configuration
@RefreshScope
class KafkaSerializerConfig {
...

Then I thought I had success: 1. Both functions of KafkaSerializerConfig were called once when running a spring test (which fires up much of the application) 2. The tests ran green

However when looking at it closer, I always get the avro serializer, never the StringSerializer (the test I was running ran green because it only uses avro. It was failing because it couldn't get a KafkaTemplate with the expected signature injected.

So now I'm back to square one.

All suggestions are welcome!

Comment From: steinarb

Unless I'm fooling myself I now have StringSerializer on the topic with key String and value String and the default AvroSerializer for the rest.

My KafkaSerializerConfig class now looks like this:

@Configuration
@RefreshScope
class KafkaSerializerConfig {

    @Bean
    fun jsonKafkaProducerFactory(
        connectionDetails: KafkaConnectionDetails?,
        sslBundles: ObjectProvider<SslBundles?>,
        kafkaProperties: KafkaProperties
    ): DefaultKafkaProducerFactory<String, String> {
        val properties: MutableMap<String, Any> = kafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory<String, String>(properties)
    }

    @Bean
    fun jsonKafkaTemplate(
        producerFactoryWithStringSerializer: ProducerFactory<String, String>
    ): KafkaTemplate<String, String> {
        val props = producerFactoryWithStringSerializer.configurationProperties
        return KafkaTemplate(producerFactoryWithStringSerializer)
    }
}

I.e. 1. I only have @Bean overrides for <String,String> (all other key/value combos get the default defined in application.yaml, which is AvroSerializer) 2. I override both KafkaProducerFactory<String,String> (to set the serializer) and KafkaTemplate<String,String> (if I don't have this override, then the <String,String> producer gets AvroSerializer) 3. The @RefreshScope annotation must be present for this to work (if it isn't present spring startup fails because it can't create and inject KafkaTemplate<?,?> for other than <String,String>)

I note that this is similar to the suggested solution here (except I use config instead of setting the serializer with code and I don't do anything in the explicit KafkaTemplate function, except create the template) https://stackoverflow.com/a/50310843

Comment From: rajadilipkolli

I think you are able to do that by using multiple ConsumerFactory (something like this) for this purpose. We use this that way. We have more factories and in KafkaListener we specify those factories for each topic. For producing you can specify more ProducerFactory.

Hi, do you happen to have an example of using multiple ConsumerFactory implementations? 😄

In fact: I think maybe multiple ProducerFactory (producer, not consumer, in my case) is what I tried to do, but didn't succeeed at....?

Perhaps you are looking at something like this sample https://github.com/rajadilipkolli/kafka-experiments/tree/main/kafka-spring-boot/boot-multiple-producers-consumers

Comment From: thecooldrop

Okay, so I think I got the gist of the issue, and I would like to work on it.

What I understood is that @steinarb wants to be able to say per topic, which key-serializer and value-serializer should be used. For example it should be possible to do something like following configuration:

  kafka:
    topic:
      transaction: ${transaction_topic}
      transactionResult: ${transaction_result_topic}
    producer:
      default-key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      default-value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      topic:
      - name: transaction
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema:
          registry:
            url: ${schema_registry_url}
        basic:
          auth:
            credentials:
              source: USER_INFO
            user:
              info: ${user_credentials}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema:
          registry:
            url: ${schema_registry_url}
        basic:
          auth:
            credentials:
              source: USER_INFO
            user:
              info: ${user_credentials}

This should then configure the application so that the producer sending to topic transaction uses the serializers configured in kafka.producer.topic, and if topic is not named in kafka.producer.topic, then default serializers defined in kafka.producer.default-*-serializer are used.

Is my understanding correct @steinarb ?

In other news, this sounds like implementing a custom Kafka factories for consumers and producers, which are currently located in in core Spring Framework. Is additional design/planning discussion needed here @philwebb ? I would be thrilled to take part in these discussions.