I have a problem with reconnection when SingleConnectionFactory (CachingConnectionFactory) is used.
I think it is a similar (or same?) problem reported in #23058 issue.
From all sources, which I have (logs, heap dump ...), it seems that problem is caused due missing Exception Listener
on connection (ActiveMQConnection), which is stored on SingleConnectionFactory.
I have following environment:
- Apache ActiveMQ Broker 5.16.2
- failover:(tcp://localhost:61616)?maxReconnectAttempts=5
- SingleConnectionFactory (CachingConnectionFactory)
- setReconnectOnException(true)
Logs
- it is visible that "resetting the underlying JMS Connection" on
CachingConnectionFactoryis invoked - but suddenly these logs are no longer present and only logs from
DefaultMessageListenerContainerare present - I know that it is not recommended to use DMLC with
CachingConnectionFactory, but please stay with me for a moment (because I think the problem is inSingleConnectionFactory)
2022-09-06T05:16:06.024+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:18:37.210+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:21:08.254+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:23:39.319+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:26:15.367+00:00 {ActiveMQ Connection Executor: unconnected} [CachingConnectionFactory] [INFO] Encountered a JMSException - resetting the underlying JMS Connection
...
2022-09-06T05:26:25.498+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
2022-09-06T05:26:30.499+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: The JMS
...
2022-09-06T05:46:30.559+00:00 {taskExecutor-142} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=242, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
2022-09-06T05:46:35.559+00:00 {taskExecutor-151} [DefaultMessageListenerContainer] [ERROR] Could not refresh JMS Connection for destination '...' - retrying using FixedBackOff{interval=5000, currentAttempts=243, maxAttempts=unlimited}. Cause: The JMS connection has failed: ...cluster.local
...
HeapDump
- I see only one
CachingConnectionFactory - I see
SingleConnectionFactory$AggregatedExceptionListenerinaggregatedExceptionListener - I see
ActiveMQConnectionunderconnectionfield- I see that
exceptionListener(onActiveMQConnection) isnull(ExceptionListener is missing - but this exception listener must be present to invoke reset of underlying JMS Connection stored onSingleConnectionFactory) - I see that
transportFailed(onActiveMQConnection) istrue(it is a probably a reason why ExceptionListener is missing, see bellow) - I see that
firstFailureError(onActiveMQConnection) isjava.net.UnknownHostException(but type of the failure is not too important)
- I see that
Debugger
- I am able to simulate the problem in debugger
- It is enough to "delay" thread, which invokes
SingleConnectionFactory.initConnection() - after new connection is created
- but before connection is prepared, where exception listener is established
Code (SingleConnectionFactory.initConnection())
public void initConnection() throws JMSException {
if (getTargetConnectionFactory() == null) {
throw new IllegalStateException(
"'targetConnectionFactory' is required for lazily initializing a Connection");
}
synchronized (this.connectionMonitor) {
if (this.connection != null) {
closeConnection(this.connection);
}
this.connection = doCreateConnection();
prepareConnection(this.connection);
if (this.startedCount > 0) {
this.connection.start();
}
if (logger.isDebugEnabled()) {
logger.debug("Established shared JMS Connection: " + this.connection);
}
}
}
Why ExceptionListener on JMS Connection stored on SingleConnectionFactory is missing?
- please suppose following situation
- the new connection is created successfully (
this.connection = doCreateConnection();) - but some of subsequent methods (in my case
prepareConnection(this.connection)) throwsJMSException - we get new connection (stored on
SingleConnectionFactory), but without JMS Exception Listener - this finally caused, that
SingleConnectionFactory.resetConnection()is never invoked and "invalid" connection stays stored onSingleConnectionFactory(till restart or "manual" invocation ofSingleConnectionFactory.resetConnection())
I know it is (nearly) impossible to get this situation, because setup of exception listener (in prepareConnection(Connection)) should be (nearly) always done before invocation of ActiveMQConnection.onException(IOException) method, which sets transportFailed and firstFailureError, which finally caused that ActiveMQConnection.setExceptionListener(ExceptionListener) (invoked in prepareConnection(this.connection)) throws ConnectionFailedException, but I see this state in application heap dump (I am sorry).
My proposal is to change SingleConnectionFactory.initConnection() in a way, when new connection is created and configured/prepared in method local field and when all is ok, then assign it to instance connection field. Something like this:
public void initConnection() throws JMSException {
...
synchronized (this.connectionMonitor) {
if (this.connection != null) {
closeConnection(this.connection);
}
Connection con = doCreateConnection();
try {
prepareConnection(con);
this.connection = con;
}
catch (JMSException ex) {
try {
con.close();
}
catch(Throwable th) {
logger.warn("Could not close new (but not used as shared) JMS Connection", th);
}
throw ex;
}
if (this.startedCount > 0) {
this.connection.start();
}
...
}
}
Comment From: simonbasle
Superseded by gh-29116