package com.example.demo;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.then;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest
public class KafkaTests {

    private static final String TEST_TOPIC_NAME = "test-topic";

    private static final String TEST_GROUP_NAME = "test-group";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // @Autowired
    @SpyBean
    private TestListener testListener;

    @Test
    void test() throws Exception {
        this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
        Thread.sleep(1000);
        then(testListener).should().receive("test");
        assertThat(testListener.latestMessage).isEqualTo("test"); // will works if change @SpyBean to @Autowired
    }

    @TestConfiguration
    @ImportTestcontainers
    static class Config {
        @Container
        @ServiceConnection
        static KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))
                .withEmbeddedZookeeper();

        @Bean
        TestListener testListener() {
            return new TestListener();
        }
    }

    static class TestListener {

        String latestMessage;

        @KafkaListener(groupId = TEST_GROUP_NAME, topics = TEST_TOPIC_NAME)
        void receive(String message) {
            this.latestMessage = message;
        }

    }
}

here is sample project kafka-demo.zip

Comment From: wilkinsona

Thanks for the sample, @quaff.

The listener also isn't called if a regular Mockito spy is used:

package com.example.demo;

import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.spy;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.context.ImportTestcontainers;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(properties = "logging.level.org.springframework.kafka=debug")
public class KafkaTests {

    private static final String TEST_TOPIC_NAME = "test-topic";

    private static final String TEST_GROUP_NAME = "test-group";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private TestListener testListener;

    @Test
    void test() throws Exception {
        this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
        Thread.sleep(1000);
        then(testListener).should().receive("test");
        // assertThat(testListener.latestMessage).isEqualTo("test");
    }

    @TestConfiguration
    @ImportTestcontainers
    static class Config {
        @Container
        @ServiceConnection
        static KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))
                .withEmbeddedZookeeper();

        @Bean
        TestListener testListener() {
            return spy(new TestListener());
        }
    }

    static class TestListener {

        String latestMessage;

        @KafkaListener(groupId = TEST_GROUP_NAME, topics = TEST_TOPIC_NAME)
        void receive(String message) {
            System.out.println(message);
            this.latestMessage = message;
        }

    }
}

As such, I don't think this is a Spring Boot problem and should be addressed in Spring Kafka.

Comment From: garyrussell

The listener is detected just fine; the problem is the send is performed before the listener has started consuming and the default auto offset reset is latest so it doesn't get already published records.

This works fine...

@SpringBootTest(properties = { "logging.level.org.springframework.kafka=debug",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

For both @SpyBean and an explicit spy().

However, I would suggest a more robust mechanism than Thread.sleep(), such as:

    @Test
    void test() throws Exception {
        this.kafkaTemplate.send(TEST_TOPIC_NAME, "test");
        CountDownLatch latch = new CountDownLatch(1);
        willAnswer(inv -> {
            inv.callRealMethod();
            latch.countDown();
            return null;
        }).given(this.testListener).receive(any());
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
        then(testListener).should().receive("test");
        assertThat(testListener.latestMessage).isEqualTo("test");
    }