This test fails with a timeout exception(logs included). Previous spring boot version of 3.1.5 works fine. Can I get some assistance with this please?
@SpringBootTest
@DirtiesContext
@ContextConfiguration(classes = KafkaClientPropertiesSaslPlaintextTest.SpringConfig.class)
https://github.com/embeddedkafka(brokerProperties = { "auto.create.topics.enable = false",
"listeners = SASL_PLAINTEXT://localhost:12000, PLAINTEXT://localhost:12001",
"sasl.enabled.mechanisms = PLAIN" },
topics = { "data-topic" })
public class KafkaClientPropertiesSaslPlaintextTest
{
@RegisterExtension
public static LoggingExtension logging = LoggingExtension.newBuilder().build();
public static class SpringConfig
{
private String brokerAddresses = "localhost:12000";
@Bean
public KafkaClientProperties kafkaProperties()
{
KafkaClientProperties properties = new KafkaClientProperties();
properties.setBootstrapServers(brokerAddresses);
SecurityProperties security = new SecurityProperties();
security.setProtocol("SASL_PLAINTEXT");
properties.setSecurity(security);
SaslProperties sasl = new SaslProperties();
sasl.setUsername("test");
sasl.setPassword("test-secret");
properties.setSasl(sasl);
return properties;
}
@Bean
public KafkaProducer<String, String> producer(KafkaClientProperties kafkaProperties)
{
Map<String, Object> props = new HashMap<>();
props.putAll(kafkaProperties.asProperties());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(props);
}
@Bean
public TestConsumer<String, String> consumer(KafkaClientProperties kafkaProperties)
{
Map<String, Object> props = new HashMap<>();
props.putAll(kafkaProperties.asProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
TestConsumer<String, String> consumer = new TestConsumer<>(props);
consumer.subscribe("data-topic");
return consumer;
}
}
@Autowired
private KafkaProducer<String, String> producer;
@Autowired
private TestConsumer<String, String> consumer;
@test
public void testSerialization()
{
producer.send(new ProducerRecord<>("data-topic", "key1", "value1"));
producer.send(new ProducerRecord<>("data-topic", "key2", "value2"));
Iterator<ConsumerRecord<String, String>> records = consumer.poll(2, Duration.ofSeconds(10L)).iterator();
ConsumerRecord<String, String> record = records.next();
assertThat(record.key()).isEqualTo("key1");
assertThat(record.value()).isEqualTo("value1");
record = records.next();
assertThat(record.key()).isEqualTo("key2");
assertThat(record.value()).isEqualTo("value2");
assertThat(records.hasNext()).isFalse();
}
}
This test fails with a timeout exception(logs included). Previous spring boot version of 3.1.5 works fine. Can I get some assistance with this please?
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [localhost:12000]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = test-client
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = PLAIN
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2024-01-25T15:08:29.499Z WARN 1792 --- [ad | producer-1] o.a.k.c.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:54760) could not be established. Broker may not be available.
2024-01-25T15:08:29.582Z INFO 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Node -1 disconnected.
2024-01-25T15:08:29.582Z WARN 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Connection to node -1 (localhost/127.0.0.1:12000) could not be established. Broker may not be available.
2024-01-25T15:08:29.583Z WARN 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Bootstrap broker localhost:12000 (id: -1 rack: null) disconnected
2024-01-25T15:08:29.674Z INFO 1792 --- [Map-uuidTopic-0] o.a.k.c.NetworkClient : [Consumer clientId=uuidTopicGroup-consumer, groupId=uuidTopicGroup-consumerGroup] Node 0 disconnected.
2024-01-25T15:08:29.674Z WARN 1792 --- [Map-uuidTopic-0] o.a.k.c.NetworkClient : [Consumer clientId=uuidTopicGroup-consumer, groupId=uuidTopicGroup-consumerGroup] Connection to node 0 (localhost/127.0.0.1:54861) could not be established. Broker may not be available.
2024-01-25T15:08:29.759Z INFO 1792 --- [-thread | testT] o.a.k.c.NetworkClient : [Consumer clientId=consumer-testT-1, groupId=testT] Node 0 disconnected.
2024-01-25T15:08:29.759Z WARN 1792 --- [-thread | testT] o.a.k.c.NetworkClient : [Consumer clientId=consumer-testT-1, groupId=testT] Connection to node 0 (localhost/127.0.0.1:54861) could not be established. Broker may not be available.
2024-01-25T15:08:29.981Z INFO 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Node -1 disconnected.
2024-01-25T15:08:29.981Z WARN 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Connection to node -1 (localhost/127.0.0.1:12000) could not be established. Broker may not be available.
2024-01-25T15:08:29.981Z WARN 1792 --- [ad | producer-7] o.a.k.c.NetworkClient : [Producer clientId=producer-7] Bootstrap broker localhost:12000 (id: -1 rack: null) disconnected
2024-01-25T15:08:30.077Z INFO 1792 --- [p-consumerGroup] o.a.k.c.NetworkClient : [Consumer clientId=testGroup-consumer, groupId=testGroup-consumerGroup] Node 0 disconnected.
2024-01-25T15:08:30.077Z WARN 1792 --- [p-consumerGroup] o.a.k.c.NetworkClient : [Consumer clientId=testGroup-consumer, groupId=testGroup-consumerGroup] Connection to node 0 (localhost/127.0.0.1:54981) could not be established. Broker may not be available.
2024-01-25T15:08:30.399Z INFO 1792 --- [ad | producer-1] o.a.k.c.NetworkClient : [Producer clientId=producer-1] Node 0 disconnected.
2024-01-25T15:08:30.399Z WARN 1792 --- [ad | producer-1] o.a.k.c.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:54760) could not be established. Broker may not be available.
2024-01-25T15:08:30.435Z INFO 1792 --- [ad | producer-2] o.a.k.c.NetworkClient : [Producer clientId=producer-2] Node 0 disconnected.
2024-01-25T15:08:30.435Z WARN 1792 --- [ad | producer-2] o.a.k.c.NetworkClient : [Producer clientId=producer-2] Connection to node 0 (localhost/1
Comment From: philwebb
This looks like a Kafka issue, but I'm afraid it's hard to tell from the snippets that you've provided. If you'd like us to spend some time investigating, please take the time to provide a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem.
Comment From: wilkinsona
I suspect that it's a duplicate of https://github.com/spring-projects/spring-boot/issues/39055.
Comment From: philwebb
Lets close this as a duplicate for now. We can reopen it if it turns out not to be the case.