I use: ActiveMQ v5.15.9 and Spring boot 2.2.12.RELEASE. I have 3 queues: "aaa", "bbb" and "manipulator" with next concurrentConsumers/maxConcurrentConsumers settings:
- "aaa": 1-2
- "bbb": 1-1
- "manipulator": 1-1
So, max threads num for JMS consumers is 2 + 1 + 1 = 4. I create next thread pool(maximumPoolSize is 4):
@Bean(name = "jmsTaskExecutor")
public Executor jmsTaskExecutor() {
return new ThreadPoolExecutor(0, 4, 30L, TimeUnit.SECONDS, new SynchronousQueue<>());
}
and jmsListenerContainerFactory bean:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency(concurrency); // default value is "1-1"
factory.setSessionTransacted(true);
factory.setTaskExecutor(jmsTaskExecutor); //here is my thread pool
factory.setMaxMessagesPerTask(10);
factory.setReceiveTimeout(3000L);
factory.setErrorHandler(t -> {
if (t.getCause() != null && !(t.getCause() instanceof RepeatMessageException ||
t.getCause() instanceof KeeperApiException)) {
log.error("Unknown error in JMS", t);
}
});
ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory)
((JmsPoolConnectionFactory) connectionFactory).getConnectionFactory();
activeMQConnectionFactory.getRedeliveryPolicyMap().setDefaultEntry(defaultEntry);
activeMQConnectionFactory.getRedeliveryPolicyMap().setRedeliveryPolicyEntries(redeliveryPolicyEntries);
return factory;
}
I sent 10 messages to "aaa" queue from another application and in consumer logs i see:
...
08-02-2021 12:24:35.887 [pool-1-thread-4] INFO api - Receive new message: [queue: ='aaa', text = 'Hello world! I'am 3']
08-02-2021 12:24:35.888 [pool-1-thread-2] INFO api - Receive new message: [queue: ='aaa', text = 'Hello world! I'am 2']
08-02-2021 12:24:37.589 [pool-1-thread-3] ERROR o.s.j.l.DefaultMessageListenerContainer - All scheduled consumers have been paused, probably due to tasks having been rejected. Check your thread pool configuration! Manual recovery necessary through a start() call.
08-02-2021 12:24:37.589 [pool-1-thread-1] ERROR o.s.j.l.DefaultMessageListenerContainer - All scheduled consumers have been paused, probably due to tasks having been rejected. Check your thread pool configuration! Manual recovery necessary through a start() call.
08-02-2021 12:24:37.888 [pool-1-thread-4] INFO api - Finish message: [queue: ='aaa', text = 'Hello world! I'am 3']
08-02-2021 12:24:37.888 [pool-1-thread-2] INFO api - Finish message: [queue: ='aaa', text = 'Hello world! I'am 2']
...
After that, if i send message to "bbb" or "manipulator" queue(from same another application), my consumer don't get this messages. I think, this consumers are paused. Why?
Comment From: ghost
The problem in RejectedExecutionHandler:
08-02-2021 15:40:01.817 [pool-1-thread-3] DEBUG o.s.j.l.DefaultMessageListenerContainer - Listener container task [org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@92b1bda] has been rejected and paused: java.util.concurrent.RejectedExecutionException: Task org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@92b1bda rejected from java.util.concurrent.ThreadPoolExecutor@5ca22e19[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
08-02-2021 15:40:01.817 [pool-1-thread-1] DEBUG o.s.j.l.DefaultMessageListenerContainer - Listener container task [org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@58690f5e] has been rejected and paused: java.util.concurrent.RejectedExecutionException: Task org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@58690f5e rejected from java.util.concurrent.ThreadPoolExecutor@5ca22e19[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
Hmmm. What is correct RejectedExecutionHandler implementation if i want pause it until thread pool is ready to process it?
Comment From: wilkinsona
@alexeyvip The Spring Boot issue tracker isn't the right place for questions like this as DefaultMessageListenerContainer
is part of Spring Framework. Also, as mentioned in the guidelines for contributing, we prefer to use GitHub issues only for bugs and enhancements. Stack Overflow and Gitter are both better places for this sort of thing.