When using batched messaging with Kafka, if the message listener is using Kotlin List<Message<*>>
in the listener, the ClassUtils.isAssignable is checking if the method is "compatible" with java.util.List and does not work with kotlin's default List:
org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument
Kotlin by default uses List from kotlin.collections and the reflection test are not assignable.
Expected to have it work with kotlin's collection identical to the Java version.
Comment From: sdeleuze
I think Kotlin lists are expected to be java.util.List
as well, see this code sample.
Could you please provide a reproducer?
Comment From: samyempcc
I will try to put a full reproducer once I get some time. Meanwhile this workaround fixed it for me:
@KafkaListener()
fun batchListener(records: java.util.List<Message<*>?>) {..}
This returns the list of GenericMessage.
When using Kotlin's default List like:
@KafkaListener()
fun batchListener(records: List<Message<*>?>) {..}
the records would not be a list of Message, but a list of the payload inside the message.
Comment From: sdeleuze
I would be really interested by a repro to understand what happens here. When using Kotlin List
, what types do you get here for targetClass
and payloadClass
?
Comment From: spring-projects-issues
If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.
Comment From: samyempcc
I would be really interested by a repro to understand what happens here. When using Kotlin
List
, what types do you get here fortargetClass
andpayloadClass
?
Digging further, I saw the root cause is actually in: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java#L682
with Kotlin the paramType resolves to:
? extends org.springframework.messaging.Message<?>
while for java, it is:
org.springframework.messaging.Message<?>
The rest of the code flow in that method ends up assigning the payload content based on the test for paramType and ends up behaving differently for kotlin.
I have a working code to reproduce the issue given a local kafka running on port 9092:
Comment From: sdeleuze
Thanks for digging into that. Indeed as discusses in #22313, Kotlin lists involve declaration site-variance so extra care should be taken for this use case. The related code seems to be on Spring Kafka side so please open a related issue on https://github.com/spring-projects/spring-kafka/issues. cc @garyrussell