Hello.
I have a Spring Boot project that uses Testcontainers Kafka for integration tests. I use the following to add the Kafka container to the @SpringBootTest:
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(TESTCONTAINERS_KAFKA_IMAGE))
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "FALSE");
In order to inject the spring.kafka.bootstrap-servers I use @DynamicPropertySource like:
@DynamicPropertySource
static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
Since upgrading to 3.1.0-M2, injecting the spring.kafka.bootstrap-servers fails and the root exception is:
Caused by: java.lang.NumberFormatException: For input string: "//localhost:54456"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
at java.base/java.lang.Integer.parseInt(Integer.java:654)
at java.base/java.lang.Integer.parseInt(Integer.java:786)
at org.springframework.boot.autoconfigure.kafka.PropertiesKafkaConnectionDetails.asNode(PropertiesKafkaConnectionDetails.java:72)
Debugging this I've tracked this down to org.springframework.boot.autoconfigure.kafka.PropertiesKafkaConnectionDetails and the following method:
private Node asNode(String bootstrapNode) {
int separatorIndex = bootstrapNode.indexOf(':');
if (separatorIndex == -1) {
return new Node(bootstrapNode, this.DEFAULT_PORT);
}
return new Node(bootstrapNode.substring(0, separatorIndex),
Integer.parseInt(bootstrapNode.substring(separatorIndex + 1)));
}
This method expects a bootstrapNode string like "localhost:9092", but KafkaContainer::getBootstrapServers() sends it "PLAINTEXT://localhost:9092".
@wilkinsona I see that this class was introduced recently. If you agree that this is a bug, I could look into it and try to resolve it.
Comment From: quaff
It's a regression introduced by https://github.com/spring-projects/spring-boot/commit/042f0c852053e200abe270764721a94c16d80533, It seems could be fixed by changing indexOf to lastIndexOf.
Comment From: wilkinsona
Thanks for the report. This working in the past was a happy accident. The property is documentated as being a list of host:port pairs:
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
Given the above, I'm not sure that we should consider this to be a bug. We can see if it's possible for this to work intentionally but we'll have to consider how the scheme and spring.kafka.security.protocol should interact. That can add quite a bit of complexity as I've seen with other services such as RabbitMQ so it might be something that we decide we'd rather avoid.
Comment From: jucosorin
Should I open a bug with Testcontainers Kafka and mention this issue?
I understand your point @wilkinsona after taking a better look at this.
But still, there's a lot of folks that are using Testcontainers Kafka and @DynamicPropertySource. What should we do once 3.1.0 goes GA?
Comment From: quaff
@jucosorin I've created PR https://github.com/spring-projects/spring-boot/pull/34774, let the team decide to merge it or not.
Comment From: wilkinsona
Should I open a bug with Testcontainers Kafka and mention this issue?
If an issue is to be raised against Testcontainers Kafka, it should be for more general reasons than how Spring Boot behaves. One such reason would be how Kafka's client describes its bootstrap.servers property in org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_DOC:
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping – this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
By including the protocol, org.testcontainers.containers.KafkaContainer.getBootstrapServers() returns a value that isn't in the documented form. However, in its implementation Kafka's client uses org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(List<String>, ClientDnsLookup) to turn a List<String> of bootstrap servers in a List<InetSocketAddress> using a regex-based approach that "matches URIs of formats: host:port and protocol://host:port".
In summary:
- Kafka's property documentation is not aligned with its runtime behavior
- If you use its return value as-is,
KafkaContainer.getBootstrapServers()relies on Kafka's undocumented(?) runtime behavior - The description of
spring.kafka.bootstrap-serversmatches Kafka's property documentation - Up until Spring Boot 3.1.0-M2, Spring Boot has unintentionally supported protocol://host:port entries in
spring.kafka.bootstrap-serversdue to Kafka's runtime behavior
Comment From: jucosorin
@wilkinsona Thank you for the detailed explanation. I will open a bug with Testcontainers and mention your comments.
In the meantime, @KafkaServiceConnection should do the trick as its implementation of KafkaContainerConnectionDetails uses URI.create():
@Override
public List<Node> getBootstrapNodes() {
URI uri = URI.create(this.container.getBootstrapServers());
return List.of(new Node(uri.getHost(), uri.getPort()));
}
The problem is, however, that for my particular case, I'm using a custom Spring Boot Starter that bootstraps the Spring Kafka code and its configuration kicks in before the @KafkaServiceConnection code that resolves the bootstrap servers from the Testcontainers Container. Somehow @KafkaServiceConnection does not replace the default bootstrap.servers property of "localhost:9092" in the Environment.
I've opened #34776 for this.
Comment From: philwebb
Looking at the code, we currently parse strings to create the Node records then rebuild Strings when we pass them to the client. I'm wondering if the Node actually buys us much? Perhaps we should just use strings and leave Kafka to decide how they are parsed.