In the process of creating a stream application to test it as a native container I discovered the @ServiceConnection is not working as expected.
I am sharing the repo. https://github.com/corneil/test-processor It contains a Java and Kotlin version of the processor.
The java project contains com.example.testprocessor.testcontainers.TestProcessorServiceConnectionTests that fails with @ServiceConnection
There are versions using @EmbeddedKafka and traditional @Testcontainers with @DynamicPropertySource
I also discovered that some tests fail unless forkEvery is 1
Reproduce
./build-test.sh
./build-fail.sh
Comment From: philwebb
@corneil The error I'm getting is as follows. Is that the same one you have?
Sep 14, 2024 12:48:20 PM org.junit.platform.launcher.core.LauncherConfigurationParameters loadClasspathResource
WARNING: Discovered 2 'junit-platform.properties' configuration files in the classpath; only the first will be used.
Sep 14, 2024 12:48:20 PM org.junit.platform.launcher.core.LauncherConfigurationParameters loadClasspathResource
WARNING: Discovered 2 'junit-platform.properties' configuration files in the classpath; only the first will be used.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.3.3)
2024-09-14T12:48:20.878-07:00 INFO 73173 --- [test-processor] [ main] .t.t.TestProcessorServiceConnectionTests : Starting TestProcessorServiceConnectionTests using Java 17.0.12 with PID 73173 (started by pwebb in /Users/pwebb/projects/spring-boot/samples/gh-42312/test-processor/java)
2024-09-14T12:48:20.879-07:00 DEBUG 73173 --- [test-processor] [ main] .t.t.TestProcessorServiceConnectionTests : Running with Spring Boot v3.3.3, Spring v6.1.12
2024-09-14T12:48:20.879-07:00 INFO 73173 --- [test-processor] [ main] .t.t.TestProcessorServiceConnectionTests : No active profile set, falling back to 1 default profile: "default"
2024-09-14T12:48:49.317-07:00 INFO 73173 --- [test-processor] [ main] c.e.t.t.TestProcessorTestBase : kafkaContainer:created:confluentinc/cp-kafka:latest
2024-09-14T12:50:36.683-07:00 ERROR 73173 --- [test-processor] [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for output-queue
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:298) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.12.jar:6.1.12]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:990) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:628) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58) ~[spring-core-6.1.12.jar:6.1.12]
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46) ~[spring-core-6.1.12.jar:6.1.12]
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1463) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:163) ~[spring-test-6.1.12.jar:6.1.12]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$10(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:383) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$11(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) ~[na:na]
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:310) ~[na:na]
at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[na:na]
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[na:na]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:377) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$6(ClassBasedTestDescriptor.java:290) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:289) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:279) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at java.base/java.util.Optional.orElseGet(Optional.java:364) ~[na:na]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:278) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:106) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:105) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:69) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:94) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:52) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:70) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:100) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:757) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
... 104 common frames omitted
2024-09-14T12:51:36.822-07:00 ERROR 73173 --- [test-processor] [ main] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for input-queue
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:524) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:186) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:139) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:98) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.12.jar:6.1.12]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:990) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:628) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58) ~[spring-core-6.1.12.jar:6.1.12]
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46) ~[spring-core-6.1.12.jar:6.1.12]
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1463) ~[spring-boot-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108) ~[spring-boot-test-3.3.3.jar:3.3.3]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260) ~[spring-test-6.1.12.jar:6.1.12]
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:163) ~[spring-test-6.1.12.jar:6.1.12]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$10(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:383) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$11(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) ~[na:na]
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:310) ~[na:na]
at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[na:na]
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[na:na]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:377) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$6(ClassBasedTestDescriptor.java:290) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:289) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:279) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at java.base/java.util.Optional.orElseGet(Optional.java:364) ~[na:na]
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:278) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:106) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:105) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:69) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:94) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:52) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:70) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:100) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:757) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
... 103 common frames omitted
2024-09-14T12:51:36.942-07:00 INFO 73173 --- [test-processor] [ main] .t.t.TestProcessorServiceConnectionTests : Started TestProcessorServiceConnectionTests in 196.22 seconds (process running for 196.983)
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
2024-09-14T12:51:37.468-07:00 INFO 73173 --- [test-processor] [ main] c.e.t.t.TestProcessorTestBase : sending TestInput{name='Joe', value=1.0} as {"name":"Joe","value":1.0}
2024-09-14T12:51:37.719-07:00 INFO 73173 --- [test-processor] [to input-queue]] c.e.t.t.TestProcessorTestBase : sent:SendResult [producerRecord=ProducerRecord(topic=input-queue, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=[B@620ee6d, timestamp=null), recordMetadata=input-queue-0@0]
2024-09-14T12:52:06.719-07:00 ERROR 73173 --- [test-processor] [ scheduling-1] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for output-queue
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:298) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:376) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.12.jar:6.1.12]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
... 13 common frames omitted
2024-09-14T12:52:06.722-07:00 ERROR 73173 --- [test-processor] [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
org.springframework.core.task.TaskRejectedException: ExecutorService in shutdown state did not accept task: org.springframework.cloud.stream.binding.BindingService$$Lambda$1574/0x00000070016cae70@194127a5
at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:405) ~[spring-context-6.1.12.jar:6.1.12]
at org.springframework.cloud.stream.binding.BindingService.scheduleTask(BindingService.java:432) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.rescheduleProducerBinding(BindingService.java:373) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:379) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.12.jar:6.1.12]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@52e6fa77[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@30c79434[Wrapped task = DelegatingErrorHandlingRunnable for org.springframework.cloud.stream.binding.BindingService$$Lambda$1574/0x00000070016cae70@194127a5]] rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler$1@29232092[Shutting down, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562) ~[na:na]
at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:402) ~[spring-context-6.1.12.jar:6.1.12]
... 10 common frames omitted
Comment From: philwebb
I think the service provider stuff is working as designed. I think the problem is that KafkaTopicProvisioner isn't using the KafkaConnectionDetails abstraction. Looking at the constructor, it calls kafkaProperties.buildAdminProperties() to build adminClientProperties. If you look at Spring Boot's KafkaAutoConfiguration, there's a applyKafkaConnectionDetailsForAdmin method to apply the connection details.
I think you'll need similar code in KafkaTopicProvisioner.
Comment From: corneil
I think the service provider stuff is working as designed. I think the problem is that
KafkaTopicProvisionerisn't using theKafkaConnectionDetailsabstraction. Looking at the constructor, it callskafkaProperties.buildAdminProperties()to buildadminClientProperties. If you look at Spring Boot'sKafkaAutoConfiguration, there's aapplyKafkaConnectionDetailsForAdminmethod to apply the connection details.I think you'll need similar code in
KafkaTopicProvisioner.
Isn't it the job of @ServiceConnection to take care of the auto configuration like @DynamicPropertySource does?
Comment From: wilkinsona
It has. It's auto-configured a KafkaConnectionDetails bean. KafkaTopicProvisioner is ignoring that bean so it has had no effect. As Phil suggested, KafkaTopicProvisioner will have to be updated to consider the connection details.
Comment From: corneil
In this version using @DynamicPropertySource we don't need to do anything else.
Comment From: wilkinsona
Configuration properties are one way of altering the KafkaConnectionDetails bean. A service connection is another. Either way, you'll have to consider the connection details for that alteration to be noticed.
Comment From: corneil
Am I misunderstanding the purpose of @ServiceConnection?
I thought itnis supposed to take case of connection properties for a range of supported TestContainers.
Comment From: wilkinsona
Slightly, yes.
The purpose of @ServiceConnection is to cause the auto-configuration for the service to connect to the container-managed service. That isn't done through configuration properties, though, it's done by configuring a …ConnectionDetails bean.
The …ConnectionDetails bean is used by the auto-configuration when it's configuring the bean(s) that connect to the service. In the absence of a …ConnectionDetails bean that's contributed by a @ServiceConnection, we auto-configure one that's backed by the configuration properties. In other words, a @ServiceConnection or some configuration properties are two different ways of influencing the connection settings that will be provided by a …ConnectionDetails bean but it's the …ConnectionDetails bean that's the canonical source for information on how to connect to the service.
If you're only looking at configuration properties, you're ignoring other possible sources of connection information (Testcontainers and Docker Compose being the main two).
Comment From: philwebb
It works at a higher level than setting properties. That turns out to be problematic for a number of reasons, but the main one is you can end up with a mix of overrides (e.g. the connection URL from one source but the username/password from another).
To get around that problem we introduced the ConnectionDetails abstraction. This is a way for us to define as a Spring bean how connections to various technologies should be established. We retro-fitted all our auto-configuration classes to use ConnectionDetails beans. If there is not ConnectionDetails bean, then we create one that reads properties from the Environment. I think most of the info in this blog is still relevant: https://spring.io/blog/2023/06/19/spring-boot-31-connectiondetails-abstraction
What @ServiceConnection does is create an appropriate ConnectionDetails bean that is backed by Testcontainers.
Comment From: corneil
Doesn't this mean that @ServiceConnection will result in the creation of KafkaConnectionDetails which should provide all that is needed to auto configure Kafka producer or consumer the same as with DynamicPropertySource.
Comment From: philwebb
Doesn't this mean that @ServiceConnection will result in the creation of KafkaConnectionDetails which should provide all that is needed to auto configure Kafka producer or consumer the same as with DynamicPropertySource.
It's not the same as DynamicPropertySource. A DynamicPropertySource will add properties to the Environment, a @ServiceConnection will create a KafkaConnectionDetails bean. As a mentioned in the previous comment, that will work for us because of this code, but not for you because your constructor, it calls kafkaProperties.buildAdminProperties() to build adminClientProperties which is completely unaware of the KafkaConnectionDetails bean.
Comment From: corneil
@philwebb I have created separate tests for Kafka and RabbitMQ to illustrate that @ServiceConnection works for RabbitMQ/AMQP but not for Kafka.
Comment From: wilkinsona
@corneil Have you changed the code in the Kafka binder to take the connections details into account as Phil suggested above? Until you've done that, it's to be expected that it doesn't work. If you've made those changes and it's still not working, please share the Kafka test with us and we can take a look.
Comment From: corneil
@wilkinsona @philwebb
You will see kafka and rabbit tests under test-processor
With both combinations of @ServiceConnection and @DynamicPropertySource and the only failure is the Kafka ServiceConnection test.
Comment From: philwebb
It looks to be like RabbitExchangeQueueProvisioner doesn't have the same kind of logic as KafkaTopicProvisioner. Specifically, it's not injecting a configuration properties class and attempting to establish the connection itself. If you put a breakpoint on RabbitMqContainerConnectionDetails.getAddresses() you can see it's our auto-configuration that's calling this one.
Comment From: wilkinsona
We seem to be going round in circles here. In the hope of breaking of that loop, I've opened a PR that should fix the problem in the Kafka stream binder. The changes in the PR update the binder so that it considers any KafkaConnectionDetails bean when making an admin, consumer, or producer connection to Kafka. If any further discussion is needed, let's have it on the PR, please, as this problem is specific to Spring Cloud Stream. As such, it's off-topic for most people watching this repository.