Through containerFactory.getContainerProperties().setListenerTaskExecutor(...), the factory can be configured with an AsyncTaskExecutor. We should see if we can auto-configure this, particularly when virtual threads are enabled as it could then be configured with a VirtualThreadTaskExecutor.
Comment From: mhalbritter
When virtual threads are enabled, we configure the container to use a SimpleAsyncTaskExecutor with virtual threads.
Comment From: ultramagnus94
I'm using Spring Boot 3.2.2
When I log in the method annotated by @KafkaListener, this still produces false:
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) int offset) {
log.info("{}", Thread.currentThread().isVirtual()); // false
consumerService.process(message);
}
Comment From: mhalbritter
Do you have a small sample application which I could use to reproduce that?
Comment From: ultramagnus94
Due to my company policy, I cannot upload any source code here. I'll do it when I'm home.
But it's quite simple
Btw, I'm using spring-kafka version 3.1.1
Comment From: ultramagnus94
After several tests, I think I found the problem. If I use default consumer factory, it will use virtual thread but I had a custom ConsumerFactory bean:
@Bean
@Primary
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my bootstrap server address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my group id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(SECURITY_PROTOCOL, "SASL_PLAINTEXT");
props.put(SECURITY_MECHANISM, "PLAIN");
props.put(SASL_JAAS_CONFIG, "my sasl configuration");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
I think this causes the problem. Btw, is there any configuration for this to use virtual thread ?
Comment From: mhalbritter
You can call ConcurrentKafkaListenerContainerFactory.setListenerTaskExecutor on your ConcurrentKafkaListenerContainerFactory to supply it with a SimpleAsyncTaskExecutor which has SimpleAsyncTaskExecutor.setVirtualThreads(true) enabled. That's what our auto-configuration does.
Comment From: Moscagus
I'm using Spring Boot 3.2.0. I also set SimpleAsyncTaskExecutor.setVirtualThreads(true). The problem I see is that validating with jcmd and jstack the listeners are tied to a platform thread forever. Therefore it does not make effective use of virtual threads. Without messages in Kafka and with a concurrency of 10, I have 10 platform threads transporting the virtual threads.
[oracle@vtuerto kafka-2602]$ cat jstack.txt | grep -A 1 ForkJoinPool-
"ForkJoinPool-1-worker-1" #39 [99] daemon prio=5 os_prio=0 cpu=2217.82ms elapsed=1798.28s allocated=136M defined_classes=707 tid=0x00007f49ef305fc0 [0x00007f49c3bfd000]
**Carrying virtual thread #59**
--
"ForkJoinPool-1-worker-2" #40 [100] daemon prio=5 os_prio=0 cpu=1901.63ms elapsed=1798.27s allocated=94034K defined_classes=125 tid=0x00005607582c69e0 [0x00007f49c3b7d000]
**Carrying virtual thread #43**
--
"ForkJoinPool-1-worker-3" #41 [101] daemon prio=5 os_prio=0 cpu=1723.87ms elapsed=1798.27s allocated=54037K defined_classes=275 tid=0x0000560758173da0 nid=101 waiting on condition [0x00007f49c3afe000]
java.lang.Thread.State: TIMED_WAITING (parking)
--
"ForkJoinPool-1-worker-4" #48 [105] daemon prio=5 os_prio=0 cpu=1803.04ms elapsed=1797.83s allocated=91141K defined_classes=60 tid=0x00007f49ec4692d0 [0x00007f49c3831000]
**Carrying virtual thread #45**
--
"ForkJoinPool-1-worker-5" #55 [111] daemon prio=5 os_prio=0 cpu=1706.78ms elapsed=1797.62s allocated=87998K defined_classes=0 tid=0x00005607577a4cd0 [0x00007f49c3531000]
**Carrying virtual thread #75**
--
"ForkJoinPool-1-worker-6" #56 [112] daemon prio=5 os_prio=0 cpu=4311.24ms elapsed=1797.61s allocated=263M defined_classes=2987 tid=0x00007f49ef397e90 nid=112 waiting on condition [0x00007f49c34b2000]
java.lang.Thread.State: WAITING (parking)
--
"ForkJoinPool-1-worker-7" #63 [117] daemon prio=5 os_prio=0 cpu=1725.83ms elapsed=1797.46s allocated=87727K defined_classes=4 tid=0x0000560757b28c60 [0x00007f49c3231000]
**Carrying virtual thread #50**
--
"ForkJoinPool-1-worker-8" #64 [118] daemon prio=5 os_prio=0 cpu=1913.82ms elapsed=1797.43s allocated=92419K defined_classes=0 tid=0x00007f49ef3d1c40 [0x00007f49c31b1000]
**Carrying virtual thread #47**
--
"ForkJoinPool-1-worker-9" #68 [121] daemon prio=5 os_prio=0 cpu=1762.17ms elapsed=1797.40s allocated=85613K defined_classes=0 tid=0x00005607577a22e0 [0x00007f49beffd000]
**Carrying virtual thread #67**
--
"ForkJoinPool-1-worker-10" #72 [124] daemon prio=5 os_prio=0 cpu=1732.98ms elapsed=1797.35s allocated=90651K defined_classes=0 tid=0x00007f49ef3e71a0 [0x00007f49bee7d000]
**Carrying virtual thread #62**
--
"ForkJoinPool-1-worker-11" #76 [127] daemon prio=5 os_prio=0 cpu=1917.60ms elapsed=1797.31s allocated=83567K defined_classes=103 tid=0x000056075837ffd0 [0x00007f49becfd000]
**Carrying virtual thread #71**
--
"ForkJoinPool-1-worker-12" #82 [130] daemon prio=5 os_prio=0 cpu=1524.40ms elapsed=1789.37s allocated=52890K defined_classes=3 tid=0x0000560757f3f7d0 [0x00007f49beb7d000]
**Carrying virtual thread #38**
--
"ForkJoinPool-1-worker-13" #92 [137] daemon prio=5 os_prio=0 cpu=2644.57ms elapsed=1769.39s allocated=106M defined_classes=934 tid=0x00007f49ec1a6a60 nid=137 waiting on condition [0x00007f49be7fe000]
java.lang.Thread.State: WAITING (parking)
Comment From: wilkinsona
@Moscagus If you have a question about configuring Spring Kafka, please ask on Stack Overflow. If you believe you've found a bug in how Spring Kafka manages its threads, please open a Spring Kafka issue.
Comment From: Moscagus
Ok thanks