Hello,

First of all, the new @ServiceConnection and related annotations are a great addition for running integration tests with Testcontainers. At the company I work for, I have created a Spring Boot Starter which is used for configuring Spring Kafka. The main autoconfiguration class kicks in with:

@AutoConfiguration(after = TaskSchedulingAutoConfiguration.class)
@EnableConfigurationProperties(StreamingKafkaProperties.class)
@ConditionalOnClass(name = "org.springframework.kafka.annotation.EnableKafka")
@ConditionalOnProperty(value = "mm.kafka.enabled", havingValue = "true")
@Import({
    StreamingKafkaSASLAutoConfiguration.class,
    StreamingKafkaProducerAutoConfiguration.class,
    StreamingKafkaConsumerAutoConfiguration.class,
    StreamingKafkaRetryAutoConfiguration.class,
    KafkaMessageService.class,
    AlertingMessageCreator.class,
    AlertingProducerService.class
})
public class StreamingKafkaAutoConfiguration {

We are doing integration tests using Testcontainers, and before this feature was introduced we were using the following to get the spring.kafka.bootstrap-servers injected into the Spring tests:

  @Container
  private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(TESTCONTAINERS_KAFKA_IMAGE))
      .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "FALSE");

  @DynamicPropertySource
  static void setProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
  }

I wanted to switch to using @ServiceConnection and the code changed to:

@Container
@KafkaServiceConnection
private static final KafkaContainer kafkaContainer = new 
  KafkaContainer(DockerImageName.parse(TESTCONTAINERS_KAFKA_IMAGE))
      .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "FALSE");

The problem is that with this approach, the @KafkaServiceConnection code kicks in after the code in my Spring Boot Starter and this results in "spring.kafka.bootstrap-servers" always being set to the default value of "PLAINTEXT://locahost:9092". The only solution to make this work is to add back the @DynamicPropertySource code, since this injects the Testcontainers Kafka container port in my autoconfiguration classes.

Is there a more elegant solution to this? Am I not using this as it should be used? As I understand it, the whole point of using @KafkaServiceConnection is to not need to use the @DynamicPropertySource.

And btw, switching back to just using @DynamicPropertySource doesn't work either now because of https://github.com/spring-projects/spring-boot/issues/34770

Comment From: quaff

You should inject ObjectProvider<KafkaConnectionDetails> connectionDetails beside of KafkaProperties properties like this https://github.com/spring-projects/spring-boot/blob/fd9b8fe0205d89ca6253755dcfcd5b14c2e7c837/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java#L81-L85

I think it's better that spring boot expose getConnectionDetails() for all related AutoConfiguration, then you can just inject KafkaAutoConfiguration configuration and use configuration.getConnectionDetails() to get the actual connection properties. WDYT @wilkinsona

Comment From: wilkinsona

Thanks for opening a separate issue, @jucosorin.

Can you provide an example of what your auto-configuration's doing at the moment? It sounds like it may be injecting KafkaProperties. If so, that's something that we don't really support at the moment:

The properties that map to @ConfigurationProperties classes available in Spring Boot, which are configured through properties files, YAML files, environment variables, and other mechanisms, are public API but the accessors (getters/setters) of the class itself are not meant to be used directly.

The introduction of service connections and interfaces like KafkaConnectionDetails are a step towards improving this and formalising things. The auto-configurations now use KafkaConnectionDetails rather than KafkaProperties for connection-related settings. When there's no KafkaConnectionDetails defined by something like @KafkaServiceConnection, this is done by adapting KafkaProperties into KafkaConnectionDetails. The adaptation is an implementation detail of the auto-configuration at the moment, through the package-private PropertiesKafkaConnectionDetails. It feels like there may be some benefits to defining a PropertiesKafkaConnectionDetails bean when there's no existing KafkaConnectionDetails bean. This would allow anything to inject KafkaConnectionDetails without having to worry about where those details are coming from. Before we considering going down this route, I'd like to have a better understanding of what your custom auto-configuration does at the moment.

Comment From: jucosorin

@quaff Thank you for your idea. Yes, I think the solution would be for me to inject a KafkaConnectionDetails in my auto configuration and use its getBootstrapNodes() instead of relying on an injected KafkaProperties

@wilkinsona The Spring Boot Starter I've developed is used to create Spring Kafka ConcurrentKafkaListenerContainerFactory automatically by reading properties from application.yml.

The Starter has a main @Autoconfiguration class that looks like this (it actually does much more, but for brevity I've only included the consumer part which I think it's enough for you to make an idea):

@AutoConfiguration(after = TaskSchedulingAutoConfiguration.class)
@EnableConfigurationProperties(StreamingKafkaProperties.class)
@ConditionalOnClass(name = "org.springframework.kafka.annotation.EnableKafka")
@ConditionalOnProperty(value = "mm.kafka.enabled", havingValue = "true")
@Import({
    StreamingKafkaConsumerAutoConfiguration.class,
})
public class StreamingKafkaAutoConfiguration {

}

The above in turn imports the real StreamingKafkaConsumerAutoConfiguration auto configuration and here's where I need to get the bootstrap.servers property:

@AutoConfiguration
@ConditionalOnProperty(value = "mm.kafka.consumer.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties({KafkaConsumerProperties.class, KafkaProperties.class})
@Slf4j
public class StreamingKafkaConsumerAutoConfiguration {

  @Bean
  @ConditionalOnMissingBean
  ConsumerFactory<Object, Object> consumerFactory(KafkaConsumerProperties consumerProperties, KafkaProperties kafkaProperties) {
    Map<String, Object> configs = new HashMap<>(kafkaProperties.buildConsumerProperties());

    configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.groupId());
    configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerProperties.sessionTimeout());
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, consumerProperties.maxPollInterval());
    configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, consumerProperties.requestTimeout());

    configs.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, consumerProperties.keyDeserializer());
    configs.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, consumerProperties.valueDeserializer());

    configs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, consumerProperties.schemaUrl());
    //Use Specific Record or else you get Avro GenericRecord in the consumer payload
    configs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

    return new DefaultKafkaConsumerFactory<>(configs);
  }

  @Bean
  @ConditionalOnMissingBean
  ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory,
      KafkaConsumerProperties consumerProperties) {
    ConcurrentKafkaListenerContainerFactory<?, ?> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(consumerProperties.concurrency());
    factory.getContainerProperties().setPollTimeout(consumerProperties.pollTimeout());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setContainerCustomizer(container -> container.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10L)));

    return factory;
  }
}

Now ,KafkaConsumerProperties is my own configuration properties class, but you're right, in thatKafkaProperties is Spring Boot's own config prop class.

Using @DynamicPropertySource would inject the correct bootstrap.servers property at runtime and my Testcontainers tests would work just fine. Now that I cannot use that, because of #34770 I need to use @KafkaServiceConnection, but this kicks in later than my autoconfiguration and I'm left with getting the default "localhost:9092" value for the bootstrap.servers property.

I would guess that one solution would be, as @quaff suggested, to inject a KafkaConnectionDetails in my auto configuration and use its getBootstrapNodes() instead of relying on an injected KafkaProperties and do something like:

private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties) {
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          nodesToStringList(this.connectionDetails.getConsumerBootstrapNodes()));
  if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
      properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
  }
}

This would work ok in integration tests where I would have a KafkaContainerConnectionDetails injected, but in production code I would need to have access to PropertiesKafkaConnectionDetails in order to do exactly what you're doing in the KafkaAutoConfiguration class:

KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<KafkaConnectionDetails> connectionDetails) {
  this.properties = properties;
  this.connectionDetails = connectionDetails
      .getIfAvailable(() -> new PropertiesKafkaConnectionDetails(properties));
}

Comment From: jucosorin

For the moment I ended up implementing my own KafkaConnectionDetails and by taking inspiration from KafkaAutoConfiguration I made it all work. If I'm running in Testcontainers integration tests I get a KafkaContainerConnectionDetails injected. If I'm running in production I'll get my own implementation that adapts from the KafkaProperties object.

I think we would benefit from a publicPropertiesKafkaConnectionDetails. It would definitely help in cases related to mine.

Comment From: jucosorin

Hi @wilkinsona. Any decision being made to have a public PropertiesKafkaConnectionDetails ? I definitely think that this will help folks writing libraries using auto-configurations where KafkaConnectionDetails is involved.

Comment From: wilkinsona

We aren't going to make the properties-based adapters public, but we are going to change the auto-configurations so that a …ConnectionDetails bean is always defined. In the absence of a …ConnectionDetails bean from another source, each auto-configuration will define a bean that adapts the configuration properties to an implementation of the relevant …ConnectionDetails API.