Description:

We are encountering an issue with our JMS service setup, where we need to connect to multiple JMS servers/brokers dynamically. We have implemented this functionality using Spring Boot 3.3.3, following the guidelines from Baeldung's article on dynamic bean registration.

Issue: - The JMS service is successfully receiving messages. - However, the ObservationRegistry is not being configured properly for the DefaultJmsListenerContainerFactory and JmsTemplate beans, leading to problems with monitoring and observation. It's worth noting that the ObservationRegistry is correctly set up when we create each bean individually, including separate beans for each broker and distinct connection factories.

Configuration:

The YAML configuration file (application.yml) is set up as follows:

jms:
  config:
    customer-service:
      connection-factory:
        broker-url: bootstrap1:443
        user-name: user1
        password: password1
    employee-service:
      connection-factory:
        broker-url: bootstrap2:443
        user-name: user2
        password: password2
    user-service:
      connection-factory:
        broker-url: bootstrap3:443
        user-name: user3
        password: password3
management:
  zipkin:
    tracing:
      endpoint: ${ZIPKIN_TRACING_ENDPOINT}

  tracing:
    sampling:
      probability: 1.0
    propagation:
      type: ${TRACING_PROPAGATION_TYPE:B3}

Code Implementation:

The JmsDynamicConfig class is designed to dynamically register JMS beans (connection factory and jms template) with names from YML based on the properties defined in the configuration file. The relevant code is as follows:

public class JmsDynamicConfig implements BeanDefinitionRegistryPostProcessor {

  private final Environment environment;
  private final ObservationRegistry observationRegistry;

  public JmsDynamicConfig(Environment environment, ObservationRegistry observationRegistry) {
    this.environment = environment;
    this.observationRegistry = observationRegistry;
  }

  @Override
  public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    // Bind properties using Binder
    JmsProperties jmsProperties = Binder.get(environment)
        .bind("jms", Bindable.of(JmsProperties.class))
        .orElseThrow(() -> new RuntimeException("Failed to bind JMS properties"));

    Map<String, JmsProperties.UseCase> useCases = jmsProperties.getConfig();

    if (useCases == null || useCases.isEmpty()) {
      throw new RuntimeException("No use cases found in JMS properties");
    }

    for (Map.Entry<String, JmsProperties.UseCase> entry : useCases.entrySet()) {
      String useCaseName = entry.getKey();
      JmsProperties.UseCase useCase = entry.getValue();
      try {
        // Create and register DefaultJmsListenerContainerFactory bean
        GenericBeanDefinition factoryBeanDefinition = new GenericBeanDefinition();
        factoryBeanDefinition.setBeanClass(DefaultJmsListenerContainerFactory.class);
        DefaultJmsListenerContainerFactory factory = createJmsListenerContainerFactory(useCase);
        factoryBeanDefinition.setInstanceSupplier(() -> factory);
        registry.registerBeanDefinition(useCaseName, factoryBeanDefinition);

        // Create and register JmsTemplate bean
        GenericBeanDefinition templateBeanDefinition = new GenericBeanDefinition();
        templateBeanDefinition.setBeanClass(JmsTemplate.class);
        JmsTemplate jmsTemplate = createJmsTemplate(connectionFactory(useCase));
        templateBeanDefinition.setInstanceSupplier(() -> jmsTemplate);
        registry.registerBeanDefinition(useCaseName + "JmsTemplate", templateBeanDefinition);

      } catch (Exception e) {
        throw new RuntimeException("Failed to create JMS beans for use case: " + useCaseName, e);
      }
    }
  }

  private DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(JmsProperties.UseCase useCase) throws Exception {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    ConnectionFactory connectionFactory = connectionFactory(useCase);
    factory.setConnectionFactory(connectionFactory);
    factory.setObservationRegistry(observationRegistry);
    return factory;
  }

  private JmsTemplate createJmsTemplate(ConnectionFactory connectionFactory) {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(connectionFactory);
    jmsTemplate.setObservationRegistry(observationRegistry);
    return jmsTemplate;
  }

  private ConnectionFactory connectionFactory(JmsProperties.UseCase useCase) throws Exception {
    ActiveMQConnectionFactory connectionFactory = setupConnectionFactory(useCase);
    return connectionFactory;
  }

  private ActiveMQConnectionFactory setupConnectionFactory(JmsProperties.UseCase useCase) {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(useCase.getConnectionFactory().getBrokerURL());
    connectionFactory.setUserName(useCase.getConnectionFactory().getUserName());
    connectionFactory.setPassword(useCase.getConnectionFactory().getPassword());
    connectionFactory.setUseAsyncSend(true);
    return connectionFactory;
  }

  @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    // No implementation needed
  }
}

JMS Listener Code:

We are dynamically registering the DefaultJmsListenerContainerFactory with names from the YAML configuration file. The @JmsListener annotations are set up as follows:

@JmsListener(destination = "Queue1", containerFactory = "customer-service")
public void receivedJmsMessageCustomerService(Message message) {
  log.info("Received JMS message from customer-service: {}", message);
}

@JmsListener(destination = "Queue1", containerFactory = "employee-service")
public void receivedJmsMessageEmployeeService(Message message) {
  log.info("Received JMS message from employee-service: {}", message);
}

Logs from JMS Listener

The following logs show the B3 headers being sent by sender and are present in message baggage, but not correctly associated with traceId and spanId:

21:09:16.924 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO  [jms-service, traceId: , spanId: ] --- receivedJmsMessageFromInternalTestQueue : ActiveMQTextMessage { destination = queue://rs.internaltest, transactionId = null, deliveryTime = 0, expiration = 0, properties = {b3=66d714d5abd5731basreere4af37165c-64b5c2f05wrwr6122438-1-f0f9a3werwrewr14af37wer165c}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Connectivity Testing}

Expected Behavior:

  • The ObservationRegistry should be correctly configured and associated with both DefaultJmsListenerContainerFactory and JmsTemplate beans.
  • This would enable proper monitoring and observation of the JMS operations.

Steps to Reproduce:

  1. Configure the application with multiple JMS brokers as shown in the YAML configuration.
  2. Implement the dynamic registration of JMS beans as described.
  3. Observe that while JMS messages are received, the ObservationRegistry is not being set up correctly for the beans.

Additional Information:

  • Spring Boot Version: 3.3.3
  • Dependency: ActiveMQ

Comment From: philwebb

Could you please provide a sample application that replicates the issue.

Comment From: hemantmaersk

Sure I can give sample application with placeholders in YML. You can add the same bootstrap server details in all use case.

Comment From: hemantmaersk

Issue Summary:

We have two projects set up to demonstrate a problem with Micrometer tracing across JMS message queues:

  • MicrometerJmsDynamicFactoryIssue (Port 8081): Repository Link - Demonstrates dynamic creation of JMS factories for multiple brokers.
  • MicrometerJmsStaticFactoryTracing (Port 8082): Repository Link - Shows distinct JMS factory with custom configurations for JMS server.

Both projects include sender and receiver code. Here’s how to reproduce the issue:

  1. Set Up ActiveMQ Locally:

  2. Download ActiveMQ: Go to the Apache ActiveMQ download page and download the binary distribution suitable for your operating system.

  3. Extract and Start: Extract the downloaded archive. Navigate to the bin directory.

    • On Unix-based systems, run ./activemq start.
    • On Windows, use activemq.bat start.
  4. Connect to ActiveMQ: By default, ActiveMQ runs on tcp://localhost:61616. You can use this URL to connect your JMS client.

  5. Checkout and Run MicrometerJmsStaticFactoryTracing:

  6. Use localhost:8082/sendJmsMessage to send a message to test-queue2.
  7. Verify that the trace ID and span ID are printed in the logs.
  8. following are logs showing traceId and spanId captured by the application

```text 12:17:57.092 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO [MicrometerJmsStaticFactoryTracing, traceId: 66d8029d654e075cd9a5bc8f7de5ea46, spanId: cf544dd051710628]


3. **Checkout and Run MicrometerJmsDynamicFactoryIssue:**
   - Use `localhost:8081/sendJmsMessage` to send a message to `test-queue1`.
   - Notice that trace ID and span ID are not printed in the logs.
   - Following are logs showing traceId and spanId not captured by the application 

 ```text 
12:19:13.589 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO  [MicrometerJmsDynamicFactoryIssue, traceId: , spanId: ]
  1. Reproduce the Issue with a Queue Swap:
  2. Shut down MicrometerJmsStaticFactoryTracing.
  3. Change the queue configuration of JMS Sender present in JmsSenderController of MicrometerJmsStaticFactoryTracing from test-queue2 to test-queue1.
  4. Restart MicrometerJmsStaticFactoryTracing.
  5. Send a message using localhost:8082/sendJmsMessage and verify that trace ID and span ID are printed in the sender logs of MicrometerJmsStaticFactoryTracing. The baggage with tracing information will also be added automatically with Trace ID.

Logs showing traceId and spanId

Sender Logs: 12:49:57.168 [http-nio-8082-exec-3] INFO  [MicrometerJmsStaticFactoryTracing, traceId: 66d80a1d9d128345da000a699f92d282, spanId: da000a699f92d282] --- Message sent to queue: test-queue1

  1. Observe the Behavior in MicrometerJmsDynamicFactoryIssue:
  2. Since MicrometerJmsDynamicFactoryIssue is now listening on test-queue1, it should receive the message sent after queue swapping step 4 mentioned above.
  3. Notice that while the message properties include baggage with B3 headers (trace ID, span ID), these are not automatically captured or processed by MicrometerJmsDynamicFactoryIssue. The fields Trace Id: Span Id are blank in logging pattern.
  4. Receiver Logs in MicrometerJmsDynamicFactoryIssue showing traceId: , spanId: are empty but B3 headers and tracing information is present in baggage against properties field with have b3 header and tracing information as properties = {b3=66d80a1d9d128345da000a699f92d282-c64ad034595f05ae-0-da000a699f92d282

Detailed log

12:49:57.175 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO  [MicrometerJmsDynamicFactoryIssue, traceId: , spanId: ] --- Received Message : ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:ADFASDFASF-58905-1725434342127-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID: ADFASDFASF-58905-1725434342127-1:2:1:1, destination = queue://test-queue1, transactionId = null, deliveryTime = 0, expiration = 0, timestamp = 1725434397159, arrival = 0, brokerInTime = 1725434397161, brokerOutTime = 1725434397165, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1b545c59, marshalledProperties = org.apache.activemq.util.ByteSequence@63176ab4, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {b3=66d80a1d9d128345da000a699f92d282-c64ad034595f05ae-0-da000a699f92d282, Country=India, Age=25, Name=Rahul}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Jms Testing}

Expected Behavior:

MicrometerJmsDynamicFactoryIssue should capture and log trace IDs and span IDs similarly to MicrometerJmsStaticFactoryTracing, particularly when receiving messages with tracing information.

Actual Behavior:

MicrometerJmsDynamicFactoryIssue does not capture the trace ID and span ID automatically, even though the message received includes the necessary B3 headers.

Additional Information:

This issue may be related to the configuration or implementation differences between the dynamic and static / distinct factory setups. Any insights or suggestions for resolving this inconsistency would be greatly appreciated.

Comment From: philwebb

ActiveMQ can also be started with:

$ docker run -it -p61616:61616 -p8161:8161 apache/activemq-classic

Comment From: philwebb

I don't think this is a Spring Boot bug. I think the problems you're seeing probably come from accidental early initialization of beans. If you change the log level for org.springframework back to INFO, you can see the following:

13:11:47.420 [main] WARN  [MicrometerJmsDynamicFactoryIssue, traceId: , spanId: ] --- Cannot enhance @Configuration bean definition 'jmsConfiguration' since its singleton instance has been created too early. The typical cause is a non-static @Bean method with a BeanDefinitionRegistryPostProcessor return type: Consider declaring such methods as 'static' and/or marking the containing configuration class as 'proxyBeanMethods=false'.

You JmsConfiguration.jmsConsumerConfig should be a static method, and furthermore should not have the ObservationRegistry injected. Your BeanDefinitionRegistryPostProcessor should also only add bean definitions, it should not attempt to get the ObservationRegistry until the instance supplier is called.

If you have any further questions, I would suggest asking on stackoverflow.com as we prefer to keep this issue tracker for bugs and enhancements only.