I'm using Spring Boot v2.7.2 and the latest version of Spring Kafka provided by spring-boot-dependencies:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

I want the app to load all configuration from application.yaml, hence I created the beans with this bare minimum configuration:

    public class KakfaConfig {

    @Bean
    public ProducerFactory<Integer, FileUploadEvent> producerFactory() {
        return new DefaultKafkaProducerFactory<>(Collections.emptyMap());
    }

    @Bean
    public KafkaTemplate<Integer, FileUploadEvent> kafkaTemplate() {
        return new KafkaTemplate<Integer, anEvent>(producerFactory());
    }

}

It seems to work as expected and loads the configuration from the application.yaml below:

spring:
  application:
    name: my-app
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      client-id: ${spring.application.name}
  #    transaction-id-prefix: "tx-"
    template:
      default-topic: my-topic

However, if I uncomment the transaction-id-prefix line, the application fails to start with the exception java.lang.IllegalArgumentException: The 'ProducerFactory' must support transactions
The documentation in here reads

If you provide a custom producer factory, it must support transactions. See ProducerFactory.transactionCapable().

The only way I have found to make it work is adding the transaction prefix programmatically as shown in the snippet below:

@Bean
public ProducerFactory<Integer, FileUploadEvent> fileUploadProducerFactory() {
    var pf = new DefaultKafkaProducerFactory<Integer, FileUploadEvent>(Collections.emptyMap());
    pf.setTransactionIdPrefix("tx-");
    return pf;
}

Any thoughts on how I can configure everything using application properties file?

Comment From: snicoll

I don't really understand what you're trying to do. Why are you setting a transaction id prefix on something that doesn't support transactions?

Comment From: wilkinsona

AIUI, the intent is that transactions should be used and that the ProducerFactory should support them. The transaction-id-prefix property can be set and this results in the auto-configuration of the kafkaTransactionManager bean. However, if you define your own ProducerFactory (to constrain the types, for example) there's no built-in way to have the transaction-id-prefix applied to that ProducerFactory.

@dbaltor One option could be to use @Value to inject the transaction ID prefix:

@Bean
public ProducerFactory<Integer, FileUploadEvent> fileUploadProducerFactory(
        @Value("spring.kafka.producer.transaction-id-prefix") String transactionIdPrefix) {
    var pf = new DefaultKafkaProducerFactory<Integer, FileUploadEvent>(Collections.emptyMap());
    pf.setTransactionIdPrefix(transactionIdPrefix);
    return pf;
}

Comment From: dbaltor

Thanks @wilkinsona for the suggestion. This is exactly what I'm doing, including the spring.kafka.producer.transaction-id-prefix injection to avoid having the same config set in two places, but it feels like a work-around. It kind of undermines the statement below (source):

With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix property - Boot will automatically configure a KafkaTransactionManager bean and wire it into the listener container.

I think it would be great if the framework could use the configuration loaded from the properties file to set the ProducerFactory properly.

Comment From: wilkinsona

I think it would be great if the framework could use the configuration loaded from the properties file to set the ProducerFactory properly

We can't do that I'm afraid. It's a fundamental principle of auto-configuration that it backs off when a user defines a bean of their own. If we post-processed the user's bean to change its configuration, it would no longer be possible for your code to take complete control of how things are configured. Unfortunately, this flexibility does sometimes require you to write a little bit more code. This is one such time.

You may want to open a Spring Kafka issue to see if they think an update should be made to the documentation. /cc @garyrussell

Comment From: dbaltor

I see what you're saying. It's a catch-22 situation. Thank you very much for the clarifications and for the snippet as well. I will get in touch with the Spring Kafka team and suggest for this case, including your code, to be mentioned in the docs.

Comment From: garyrussell

Due to type erasure, it is not necessary to define your own beans to constrain the types; you can simply use Boot's auto configured beans.

Boot creates the template as <Object, Object> but you can autowire it (or use it as a parameter to a bean factory method) as <Integer, FileUploadEvent> (or anything you want)

Caveat: this does not work with native code/GraalVM.

Furthermore, the generic types on KafkaTemplate are only syntactic sugar for all methods except receive() methods; it has no runtime or compile time benefit. It is perfectly ok to use it as KafkaTemplate<Object, Object>.

Comment From: dbaltor

That's very interesting. Thank you @garyrussell ! At this time, I think I do need to create my own beans as I'm using bespoken (De)Serializers in both of my Producer and Consumer factories.