Spring Boot Version: 2.7.9
Hello,
I am having an issue in a @SpringBootTest that also defines a mock using @MockBean.
Here you'll find a small GitHub Project that demonstrates the issue: https://github.com/gossie/spring-boot-embeddedkafka-mockbean-error-showcase
There are two tests.
DemoApplicationTests doesn't use any mocks, the test is green and everything works fine.
DemoApplicationWithMockTests creates the ThirdService as mock and the test that should be green is red.
Here is what I have found out so far (all observed on my machine):
* In org.springframework.kafka.listener.KafkaMessageListenerContainer#doStart (line 355) a ListenableFuture is created and started asynchronuously.
* In the second test case (the one with with the mock) the future is executed to late, that means after the test execution started. So the KafkaTemplate sends the event, but the EmbeddedKafka is actually not ready yet.
And I think that is why. First let's go through the test without mock:
* In org.springframework.test.context.TestContextManager#prepareTestInstance the TestExecutionListeners are called.
* The SpringBootDependencyInjectionTestExecutionListener and loads the application context. The includes the kafka and starting the asynchronuous task.
* Everything goes its way and at some point we arrive in the SpringExtension and in org.springframework.test.context.TestContextManager#beforeTestMethod where all the TestExecutionListeners are called again.
* When the ResetMocksTestExecutionListener is execution, we end up in this method org.springframework.boot.test.mock.mockito.MockReset#get. When Mockito.mockingDetails(mock) should be executed, the debugger hits the breakpoint in the asynchronuous task that was started earlier. I think that is because Mockito hasn't been used yet, it needs to be loaded and the scheduler has time to execute the other thread (just guessing).
* Now everything is started up, the test is executed => green test
Now the test with the @MockBean annotation:
* Instead of the SpringBootDependencyInjectionTestExecutionListener the MockitoTestExecutionListener loads the application context.
* The mocks are initialized and Mockito was already in use.
* When the ResetMocksTestExecutionListener nothing needs to be loaded since Mockito was already in use.
* The tese is executed and a message is sent via the KafkaTemplate and the test waits for the expected condition.
* Now the asynchronuous task is executed, but it's too late, the message was already sent => red test
Just for verification I added a Thread.sleep(5000) as the first instruction to org.springframework.test.context.junit.jupiter.SpringExtension#beforeTestExecution. That way the asynchronuous task was executed before the test is executed and it was green. But as I said, that was just to verify.
Comment From: wilkinsona
Thanks very much for the sample and analysis of the problem.
As far as I can tell, the race condition is entirely within Spring Kafka's KafkaMessageListenerContainer. Its doStart() method uses a CountDownLatch to wait for the consumer thread. This latch is counted down by the ListenerConsumer immediately before it publishes the ConsumerStartingEvent. Once counted down, the latch allows the main thread to proceed and the tests to execute. Crucially, the count down happens before the consumer thread has been set up and before it has actually started consumption. This results in the race condition between the main thread executing the test and consumption beginning.
The timing window in which the race occurs varies in size depending on the processing that the main thread still has to do before the test can execute. Using @MockBean reduces the work that the main thread has to do (as it has already been done earlier) which makes it more likely that it will win the race and that the test will execute before consumption has begun. The problem can be reproduced without Mockito or @MockBean by using the debugger to affect the timing window. To do so, debug DemoApplicationTests with a breakpoint on the call to pollAndInvoke() in the listener consumer. With the thread suspended at this point, the main thread will be allowed to proceed. The test executes and then times out.
To fix this reliably the race needs to be eliminated entirely and that will require a change in Spring Kafka. Can you please open a new Spring Kafka issue linking to this one?
/cc @garyrussell
Comment From: garyrussell
The latch is counted down in publishConsumerStartingEvent() so it is not intended to indicate the consumer is ready to receive messages, even the ConsumerStartedEvent doesn't mean that the consumer has actually been assigned any partitions yet, it is just used to make sure there are enough threads in the task executor (if externally provided) for the configured concurrency.
but the EmbeddedKafka is actually not ready yet.
When you say EmbeddedKafka, I assume you mean the consumer is not ready yet (if the broker isn't ready, the send will fail).
There are several ways to write tests to handle this condition.
- Set
spring.kafka.consumer.auto-offset-reset=earliest(it defaults to latest) so the consumer will receive the already sent record when it subscribes. - Add a rebalance listener and wait the partition assignment
- Call
ContainerTestUtils.waitForAssignment()
Comment From: gossie
Thank you! I went with the property and it works fine.
Comment From: ScrambledRK
I had a similar issue when running my entire test suite with two integration test classes annotated by @EmbeddedKafka and @SpringBootTest. I am using spring-boot 3.4.0. I comment here because this thread finally helped me.
The first test sending and receiving a Kafka event of the second test class would always fail, because it would never receive any events. If I only run this failing test method (or even its entire test class) in isolation it would succeed again. If I would debug the failing test when running the entire test suit it would also succeed.
The problem was fixed for me using ContainerTestUtils.waitForAssignment().
@BeforeEach
void setUp(@Autowired KafkaListenerEndpointRegistry registry) {
MessageListenerContainer container = Objects.requireNonNull(
registry.getListenerContainer(MyKafkaListenerContainer.ID)
);
ContainerTestUtils.waitForAssignment(container, 2); // 2 is default number of topics
}
I already had spring.kafka.consumer.auto-offset-reset=earliest set, but I am using a custom container and consumer factory and even after explicitly setting this configuration property to "earliest" it still had no effect. But it is entirely likely I made a mistake here since I am new to spring.
I first suspected an issue with context cleanup (so I tried @DirtiesContext), or both classes using the same topics or that it might be related to the shared groupId. None of that had any effect. I also read someplace that it might be related to rebalancing but I was hoping for an easy way out and found this thread.
I still don't understand if its a bug or if I am doing something wrong, but I thank you all.