package com.example.demo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.then;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest
public class KafkaTests {
private static final String TEST_TOPIC_NAME = "test-topic";
private static final String TEST_GROUP_NAME = "test-group";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// @Autowired
@SpyBean
private TestListener testListener;
@Test
void test() throws Exception {
this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
Thread.sleep(1000);
then(testListener).should().receive("test");
assertThat(testListener.latestMessage).isEqualTo("test"); // will works if change @SpyBean to @Autowired
}
@TestConfiguration
@ImportTestcontainers
static class Config {
@Container
@ServiceConnection
static KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))
.withEmbeddedZookeeper();
@Bean
TestListener testListener() {
return new TestListener();
}
}
static class TestListener {
String latestMessage;
@KafkaListener(groupId = TEST_GROUP_NAME, topics = TEST_TOPIC_NAME)
void receive(String message) {
this.latestMessage = message;
}
}
}
here is sample project kafka-demo.zip
Comment From: wilkinsona
Thanks for the sample, @quaff.
The listener also isn't called if a regular Mockito spy is used:
package com.example.demo;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.spy;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(properties = "logging.level.org.springframework.kafka=debug")
public class KafkaTests {
private static final String TEST_TOPIC_NAME = "test-topic";
private static final String TEST_GROUP_NAME = "test-group";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private TestListener testListener;
@Test
void test() throws Exception {
this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
Thread.sleep(1000);
then(testListener).should().receive("test");
// assertThat(testListener.latestMessage).isEqualTo("test");
}
@TestConfiguration
@ImportTestcontainers
static class Config {
@Container
@ServiceConnection
static KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))
.withEmbeddedZookeeper();
@Bean
TestListener testListener() {
return spy(new TestListener());
}
}
static class TestListener {
String latestMessage;
@KafkaListener(groupId = TEST_GROUP_NAME, topics = TEST_TOPIC_NAME)
void receive(String message) {
System.out.println(message);
this.latestMessage = message;
}
}
}
As such, I don't think this is a Spring Boot problem and should be addressed in Spring Kafka.
Comment From: garyrussell
The listener is detected just fine; the problem is the send is performed before the listener has started consuming and the default auto offset reset is latest so it doesn't get already published records.
This works fine...
@SpringBootTest(properties = { "logging.level.org.springframework.kafka=debug",
"spring.kafka.consumer.auto-offset-reset=earliest"})
For both @SpyBean and an explicit spy().
However, I would suggest a more robust mechanism than Thread.sleep(), such as:
@Test
void test() throws Exception {
this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
CountDownLatch latch = new CountDownLatch(1);
willAnswer(inv -> {
inv.callRealMethod();
latch.countDown();
return null;
}).given(this.testListener).receive(any());
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
then(testListener).should().receive("test");
assertThat(testListener.latestMessage).isEqualTo("test");
}