Manuel Jordan opened SPR-16941 and commented

I have two ActiveMQ servers already started. Consider primary and secondary.

The following works fine when the app is started

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/topic","/queue").setRelayHost("192.168.1.88")
                                                  .setRelayPort(61613);                                                              
    config.setApplicationDestinationPrefixes("/app");
    config.setUserDestinationPrefix("/user");
}

Consider the current StompBroker shown above as valid and works how is expected when the app is started and it works by default with the primary ActiveMQ server. Now consider if the primary server goes down. I need edit/change in runtime StompBroker to work with the secondary.

With the current API, seems is not possible change in runtime the StompBrokerRelay to be updated again to use other secondary server.

Not sure if my approach about to work with StompBrokerRelay is correct or if I should 'retrieve' other Bean or component and apply the new changes.


Affects: 5.0 GA, 5.0.7

Reference URL: https://stackoverflow.com/questions/50650296/spring-websocket-how-update-and-reflect-the-changes-in-runtime-for-the-messageb

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Have you seen this part of the documentation? The paragraph that starts with:

"By default, the STOMP broker relay always connects, and reconnects as needed if connectivity is lost, to the same host and port... "

Comment From: spring-projects-issues

Manuel Jordan commented

Thanks, I didn't. I can see is possible with the snippet code available there.

Just curious if exists a sample code available through the samples in GitHub

Thank You.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

There is sample code in the docs. What's missing?

Comment From: spring-projects-issues

Manuel Jordan commented

Just curious about the following:

 private ReactorNettyTcpClient<byte[]> createTcpClient() {

        Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
            builder.connectAddress(()-> {
                // Select address to connect to ...
            });
        };

        return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
    }

It for the // Select address to connect to ... part.

Perhaps there is a sample in Github to analyze that part of the code.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

There is nothing specific for that. You could keep track of which one you connected to last, and alternate.

Comment From: spring-projects-issues

Manuel Jordan commented

Ok, I will do a research. You can close this issue.

Thanks for your support

Comment From: spring-projects-issues

Manuel Jordan commented

I did do realize the following. There is no a setTcpClient method in the StompBrokerRelayRegistration. The sample code is wrong.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Seems to be there, added recently in 4.3.15 / 5.0.5. 

Comment From: spring-projects-issues

Manuel Jordan commented

Thank you. I am working with springFrameworkVersion = '5.0.4.RELEASE'

Comment From: spring-projects-issues

Manuel Jordan commented

I have now the following:

@Override
 public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic","/queue")
              .setTcpClient(tcpClientConfig.tcpClient());
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
    }

Where tcpClient() is:

@Bean
TcpOperations<byte[]> tcpClient(){

    logger.info("tcpClient ...");

    ReactorNettyTcpClient<byte[]> reactorNettyTcpClient = null;

    Optional<Broker> brokerOptional = Optional.ofNullable(BrokerContainer.retrieveBroker("server"));
    logger.info("{}", brokerOptional.toString());

    if(brokerOptional.isPresent()) {
        logger.info("{}", brokerOptional.get().toString());

        reactorNettyTcpClient =
                new ReactorNettyTcpClient<>(
                        brokerOptional.get().getHost(), brokerOptional.get().getPort(), new StompReactorNettyCodec());

        logger.info("{}", reactorNettyTcpClient.toString());

    }
    else {
        logger.error("Optional is empty!!!!");
    }

    logger.info("... tcpClient");

    return reactorNettyTcpClient;
}

Above works fine only when the app is started.

Note: BrokerContainer is a custom class contains a Map where it always has updated other custom Broker object with the valid host/port to connect with ActiveMQ about Stomp.

Taking in assumption the mentioned in:

By default, the STOMP broker relay always connects, and reconnects as needed if connectivity is lost, to the same host and port. If you wish to supply multiple addresses, on each attempt to connect, you can configure a supplier of addresses, instead of a fixed host and port

From above:

If you wish to supply multiple addresses, on each attempt to connect, you can configure a supplier of addresses

I "understood" with the sample code from the Reference documentation the following: * If the ActiveMQ server goes down, the tcpClient() method should be called again automatically and thus use the new data retrieved through BrokerContainer. I have confirmed that tcpClient() never is called

Therefore I am assuming the following, according with the comment

Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
            builder.connectAddress(()-> {
                // Select address to connect to ...   <-----------
            });
        };

That from the beginning or just one time (at startup) should be available a set of objects that represents the ActiveMQ servers. 

But What happens if in runtime a new server is added? Or what happens if the secondary server was offline (the app is started) and latter that secondary server is online again. Because my tcpClient() never is called again. I am not able to reflect the new change.

Was is missing in my code?

Thank you

Comment From: spring-projects-issues

Rossen Stoyanchev commented

It says there on the Javadoc that setting the TcpClient is mutually exclusive with configuring a host/port.

Don't think of anything complicated. Literally, just return the address to use.

Comment From: spring-projects-issues

Manuel Jordan commented

About the Javadoc, I understand that setRelayHost and setRelayPort methods are replaced by setTcpClient. I understood that from the beginning. What I tried to say is about the following:

I have:

@Data
@AllArgsConstructor
public class Broker {

    private String host;
    private Integer port;

}

That is used in (through an Optional<>):

reactorNettyTcpClient =
        new ReactorNettyTcpClient<>(
                brokerOptional.get().getHost(), 
                                brokerOptional.get().getPort(), 
                                new StompReactorNettyCodec());

Again the following is called just once when the server is started.

@Bean
TcpOperations<byte[]> tcpClient(){
 ...
  reactorNettyTcpClient = ...
 ...
}

Thus, how add a new ReactorNettyTcpClient when the app has been started?

When the primary server is down I need use the second ReactorNettyTcpClient that represents the secondary server.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

But why are you trying to get those values from the broker relay? They're supposed to be passed into the broker relay, not the other way around. Like I said, simply return the addressed to use in place of that comment.

Comment From: spring-projects-issues

Manuel Jordan commented

Consider my current problem: * ActiveMQ server one 192.168.1.3 * ActiveMQ server two 192.168.1.4

Let's assume for simplicity the ports are the same, of course the ip are different

With my current code: the app starts up with ActiveMQ (server One), it through

config.enableStompBrokerRelay("/topic", "/queue").setTcpClient(tcpClientConfig.tcpClient());

Where the source code for the tcpClient() method is already shared before

Until here all is fine: * Spring Framework + Spring WebSocket (Stomp) + ActiveMQ (server One) all work in peace

Case: now in runtime, consider if ActiveMQ (server One) goes down, but ActiveMQ (server Two) is available.

How I configure Spring(WebSocket) to use ActiveMQ (server Two)? (it in runtime). I can't change it currently with the current API.

That's is my problem: how update the MessageBrokerRegistry object in runtime.

Spring tries to reconnect with the ActiveMQ (server One) yet. And I receive:

[tcp-client-loop-nio-4] WARN  o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: Operation timed out: /192.168.1.3:61613 
io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.1.3:61613
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Operation timed out
    ... 10 common frames omitted

Comment From: spring-projects-issues

Rossen Stoyanchev commented

How I configure Spring(WebSocket) to use ActiveMQ (server Two)? (it in runtime). I can't change it currently with the current API.

Hm, why? It's a java.util.function.Supplier and that's invoked on each attempt to connect.

Comment From: spring-projects-issues

Manuel Jordan commented

Ok, I am going to change all the following:

    @Bean
    TcpOperations<byte[]> tcpClient(){

        logger.info("tcpClient ...");

        ReactorNettyTcpClient<byte[]> reactorNettyTcpClient = null;

        Optional<Broker> brokerOptional = Optional.ofNullable(BrokerContainer.retrieveBroker("server"));
        logger.info("{}", brokerOptional.toString());

        if(brokerOptional.isPresent()) {
            logger.info("{}", brokerOptional.get().toString());

            reactorNettyTcpClient =
                    new ReactorNettyTcpClient<>(
                            brokerOptional.get().getHost(), brokerOptional.get().getPort(), new StompReactorNettyCodec());

            logger.info("{}", reactorNettyTcpClient.toString());

        }
        else {
            logger.error("Optional is empty!!!!");
        }

        logger.info("... tcpClient");

        return reactorNettyTcpClient;
    }

To work with java.util.function.Supplier instead.

Thank you ... let's see what happens now

Comment From: spring-projects-issues

Manuel Jordan commented

Ok, seems I have all this resolved. I have the following now:

@Bean
TcpOperations<byte[]> tcpClient_(){

    logger.info("tcpClient_ ...");

    Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
            builder.connectAddress(()-> {
                //Alpha
                logger.info("connectAddress ...");
                logger.info("   SA: {}", socketAddressSupplier().get().toString());
                return socketAddressSupplier().get();
            });
        };

       logger.info("... tcpClient_");

       return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
    }

Supplier<Broker> brokerSupplier(){
    logger.info("brokerSupplier ...");
    Supplier<Broker> brokerSupplier = () -> BrokerContainer.retrieveBroker("server");
    logger.info("   {}", brokerSupplier.get().toString());
    return brokerSupplier;
}

Supplier<? extends SocketAddress> socketAddressSupplier(){
    logger.info("socketAddressSupplier ...");
    Supplier<? extends SocketAddress> socketAddressSupplier = () -> new InetSocketAddress(brokerSupplier().get().getHost(), brokerSupplier().get().getPort());
    logger.info("   {}", socketAddressSupplier.get().toString());
    return socketAddressSupplier;
}

I can confirm that Alpha is executed for a second time and detects now the secondary server how was my requirement from the beginning.

Just after some time. Just only one time appears the following about the 'first' server

[tcp-client-loop-nio-4] WARN  o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: Operation timed out: /192.168.1.3:61613 
io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.1.3:61613
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Operation timed out
    ... 10 common frames omitted

I am assuming it is the final attempt to connect to the server unavailable, and finally that 'connection' is destroyed or discarded by complete. Am I correct?

Apart: after I have returned to lunch I got just once:

[tcp-client-loop-nio-2] WARN  o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: No messages received in 30000 ms. 

I am assuming it is due inactivity

Thanks for your guidance.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

This way of using Supplier's doesn't look right:

() -> new InetSocketAddress(
        brokerSupplier().get().getHost(), 
        brokerSupplier().get().getPort());

Shouldn't brokerSupplier() be called only once in this case? Calling twice could give you a different pair of host and port values.

Comment From: spring-projects-issues

Manuel Jordan commented

Agree, that was the 'raw' version. Currently I have the following:

@Bean
TcpOperations<byte[]> tcpClient_(){

   logger.info("tcpClient_ ...");

   Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
       builder.connectAddress(()-> {
           logger.info("connectAddress ...");
           SocketAddress socketAddress = socketAddressSupplier().get();
           logger.info("    {}", socketAddress.toString());
           return socketAddress;
       });
   };

    logger.info("... tcpClient_");

    return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
}

Supplier<? extends SocketAddress> socketAddressSupplier(){
    logger.info("socketAddressSupplier ...");
    Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
    StompBrokerServer stompBrokerServer =        
               StompBrokerServerContainer.retrieveStompBrokerServer("server");
    return new InetSocketAddress(stompBrokerServer.getHost(), stompBrokerServer.getPort());
    };
    return socketAddressSupplier;
}

And I have the following just one time (all together - one block)

09:50:47.357 97937 [tcp-client-loop-nio-4] WARN  o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: connection timed out: /192.168.1.3:61613 
io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.1.3:61613
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:125)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:465)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
    at java.lang.Thread.run(Thread.java:748)
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO  c.m.j.c.websocket.TcpClientConfig - connectAddress ... 
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO  c.m.j.c.websocket.TcpClientConfig - socketAddressSupplier ... 
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO  c.m.j.c.websocket.TcpClientConfig -    /127.0.0.1:61613 
09:50:47.454 98034 [tcp-client-scheduler-1] INFO  o.s.m.s.s.StompBrokerRelayMessageHandler - "System" session

From above observe that finally fails the primary unavailable server Failed to connect: connection timed out: /192.168.1.3:61613 and just then the secondary available server is detected (c.m.j.c.websocket.TcpClientConfig - 127.0.0.1:61613) and is used how the new broker for stomp.

Is it the expected behaviour? Fail a latest time (for primary server) and work with other address (for the secondary server)? I am most concerned about the former part, if I should see that exception about the primary server

Thanks for your support

Comment From: PaulGobin

Did anyone figure out how to auto connect to a different broker in the list if the connected broker becomes unavailable? I couldn't find a way to detect the "lostConnection event handler or callback". If I could, I can add code to reconnect to a different broker. Maybe an external load balancer will do the trick?