Manuel Jordan opened SPR-16941 and commented
I have two ActiveMQ
servers already started. Consider primary and secondary.
The following works fine when the app is started
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic","/queue").setRelayHost("192.168.1.88")
.setRelayPort(61613);
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
Consider the current StompBroker
shown above as valid and works how is expected when the app is started and it works by default with the primary ActiveMQ
server. Now consider if the primary server goes down. I need edit/change in runtime StompBroker
to work with the secondary.
With the current API, seems is not possible change in runtime the StompBrokerRelay
to be updated again to use other secondary server.
Not sure if my approach about to work with StompBrokerRelay
is correct or if I should 'retrieve' other Bean or component and apply the new changes.
Affects: 5.0 GA, 5.0.7
Reference URL: https://stackoverflow.com/questions/50650296/spring-websocket-how-update-and-reflect-the-changes-in-runtime-for-the-messageb
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Have you seen this part of the documentation? The paragraph that starts with:
"By default, the STOMP broker relay always connects, and reconnects as needed if connectivity is lost, to the same host and port... "
Comment From: spring-projects-issues
Manuel Jordan commented
Thanks, I didn't. I can see is possible with the snippet code available there.
Just curious if exists a sample code available through the samples in GitHub
Thank You.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
There is sample code in the docs. What's missing?
Comment From: spring-projects-issues
Manuel Jordan commented
Just curious about the following:
private ReactorNettyTcpClient<byte[]> createTcpClient() {
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
// Select address to connect to ...
});
};
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
}
It for the // Select address to connect to ...
part.
Perhaps there is a sample in Github to analyze that part of the code.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
There is nothing specific for that. You could keep track of which one you connected to last, and alternate.
Comment From: spring-projects-issues
Manuel Jordan commented
Ok, I will do a research. You can close this issue.
Thanks for your support
Comment From: spring-projects-issues
Manuel Jordan commented
I did do realize the following. There is no a setTcpClient
method in the StompBrokerRelayRegistration
. The sample code is wrong.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Seems to be there, added recently in 4.3.15 / 5.0.5.
Comment From: spring-projects-issues
Manuel Jordan commented
Thank you. I am working with springFrameworkVersion = '5.0.4.RELEASE'
Comment From: spring-projects-issues
Manuel Jordan commented
I have now the following:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic","/queue")
.setTcpClient(tcpClientConfig.tcpClient());
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
Where tcpClient()
is:
@Bean
TcpOperations<byte[]> tcpClient(){
logger.info("tcpClient ...");
ReactorNettyTcpClient<byte[]> reactorNettyTcpClient = null;
Optional<Broker> brokerOptional = Optional.ofNullable(BrokerContainer.retrieveBroker("server"));
logger.info("{}", brokerOptional.toString());
if(brokerOptional.isPresent()) {
logger.info("{}", brokerOptional.get().toString());
reactorNettyTcpClient =
new ReactorNettyTcpClient<>(
brokerOptional.get().getHost(), brokerOptional.get().getPort(), new StompReactorNettyCodec());
logger.info("{}", reactorNettyTcpClient.toString());
}
else {
logger.error("Optional is empty!!!!");
}
logger.info("... tcpClient");
return reactorNettyTcpClient;
}
Above works fine only when the app is started.
Note: BrokerContainer
is a custom class contains a Map
where it always has updated other custom Broker
object with the valid host/port to connect with ActiveMQ
about Stomp
.
Taking in assumption the mentioned in:
By default, the STOMP broker relay always connects, and reconnects as needed if connectivity is lost, to the same host and port. If you wish to supply multiple addresses, on each attempt to connect, you can configure a supplier of addresses, instead of a fixed host and port
From above:
If you wish to supply multiple addresses, on each attempt to connect, you can configure a supplier of addresses
I "understood" with the sample code from the Reference documentation the following:
* If the ActiveMQ
server goes down, the tcpClient()
method should be called again automatically and thus use the new data retrieved through BrokerContainer
. I have confirmed that tcpClient()
never is called
Therefore I am assuming the following, according with the comment
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
// Select address to connect to ... <-----------
});
};
That from the beginning or just one time (at startup) should be available a set of objects that represents the ActiveMQ
servers.
But What happens if in runtime a new server is added?
Or what happens if the secondary server was offline (the app is started) and latter that secondary server is online again. Because my tcpClient()
never is called again. I am not able to reflect the new change.
Was is missing in my code?
Thank you
Comment From: spring-projects-issues
Rossen Stoyanchev commented
It says there on the Javadoc that setting the TcpClient is mutually exclusive with configuring a host/port.
Don't think of anything complicated. Literally, just return the address to use.
Comment From: spring-projects-issues
Manuel Jordan commented
About the Javadoc
, I understand that setRelayHost
and setRelayPort
methods are replaced by setTcpClient
. I understood that from the beginning. What I tried to say is about the following:
I have:
@Data
@AllArgsConstructor
public class Broker {
private String host;
private Integer port;
}
That is used in (through an Optional<>
):
reactorNettyTcpClient =
new ReactorNettyTcpClient<>(
brokerOptional.get().getHost(),
brokerOptional.get().getPort(),
new StompReactorNettyCodec());
Again the following is called just once when the server is started.
@Bean
TcpOperations<byte[]> tcpClient(){
...
reactorNettyTcpClient = ...
...
}
Thus, how add a new ReactorNettyTcpClient
when the app has been started?
When the primary server is down I need use the second ReactorNettyTcpClient
that represents the secondary server.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
But why are you trying to get those values from the broker relay? They're supposed to be passed into the broker relay, not the other way around. Like I said, simply return the addressed to use in place of that comment.
Comment From: spring-projects-issues
Manuel Jordan commented
Consider my current problem:
* ActiveMQ
server one 192.168.1.3
* ActiveMQ
server two 192.168.1.4
Let's assume for simplicity the ports are the same, of course the ip are different
With my current code: the app starts up with ActiveMQ (server One), it through
config.enableStompBrokerRelay("/topic", "/queue").setTcpClient(tcpClientConfig.tcpClient());
Where the source code for the tcpClient()
method is already shared before
Until here all is fine: * Spring Framework + Spring WebSocket (Stomp) + ActiveMQ (server One) all work in peace
Case: now in runtime, consider if ActiveMQ (server One) goes down, but ActiveMQ (server Two) is available.
How I configure Spring(WebSocket) to use ActiveMQ (server Two)? (it in runtime). I can't change it currently with the current API.
That's is my problem: how update the MessageBrokerRegistry
object in runtime.
Spring tries to reconnect with the ActiveMQ (server One)
yet. And I receive:
[tcp-client-loop-nio-4] WARN o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: Operation timed out: /192.168.1.3:61613
io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.1.3:61613
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Operation timed out
... 10 common frames omitted
Comment From: spring-projects-issues
Rossen Stoyanchev commented
How I configure Spring(WebSocket) to use ActiveMQ (server Two)? (it in runtime). I can't change it currently with the current API.
Hm, why? It's a java.util.function.Supplier
and that's invoked on each attempt to connect.
Comment From: spring-projects-issues
Manuel Jordan commented
Ok, I am going to change all the following:
@Bean
TcpOperations<byte[]> tcpClient(){
logger.info("tcpClient ...");
ReactorNettyTcpClient<byte[]> reactorNettyTcpClient = null;
Optional<Broker> brokerOptional = Optional.ofNullable(BrokerContainer.retrieveBroker("server"));
logger.info("{}", brokerOptional.toString());
if(brokerOptional.isPresent()) {
logger.info("{}", brokerOptional.get().toString());
reactorNettyTcpClient =
new ReactorNettyTcpClient<>(
brokerOptional.get().getHost(), brokerOptional.get().getPort(), new StompReactorNettyCodec());
logger.info("{}", reactorNettyTcpClient.toString());
}
else {
logger.error("Optional is empty!!!!");
}
logger.info("... tcpClient");
return reactorNettyTcpClient;
}
To work with java.util.function.Supplier
instead.
Thank you ... let's see what happens now
Comment From: spring-projects-issues
Manuel Jordan commented
Ok, seems I have all this resolved. I have the following now:
@Bean
TcpOperations<byte[]> tcpClient_(){
logger.info("tcpClient_ ...");
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
//Alpha
logger.info("connectAddress ...");
logger.info(" SA: {}", socketAddressSupplier().get().toString());
return socketAddressSupplier().get();
});
};
logger.info("... tcpClient_");
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
}
Supplier<Broker> brokerSupplier(){
logger.info("brokerSupplier ...");
Supplier<Broker> brokerSupplier = () -> BrokerContainer.retrieveBroker("server");
logger.info(" {}", brokerSupplier.get().toString());
return brokerSupplier;
}
Supplier<? extends SocketAddress> socketAddressSupplier(){
logger.info("socketAddressSupplier ...");
Supplier<? extends SocketAddress> socketAddressSupplier = () -> new InetSocketAddress(brokerSupplier().get().getHost(), brokerSupplier().get().getPort());
logger.info(" {}", socketAddressSupplier.get().toString());
return socketAddressSupplier;
}
I can confirm that Alpha
is executed for a second time and detects now the secondary server how was my requirement from the beginning.
Just after some time. Just only one time appears the following about the 'first' server
[tcp-client-loop-nio-4] WARN o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: Operation timed out: /192.168.1.3:61613
io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.1.3:61613
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Operation timed out
... 10 common frames omitted
I am assuming it is the final attempt to connect to the server unavailable, and finally that 'connection' is destroyed or discarded by complete. Am I correct?
Apart: after I have returned to lunch I got just once:
[tcp-client-loop-nio-2] WARN o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: No messages received in 30000 ms.
I am assuming it is due inactivity
Thanks for your guidance.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
This way of using Supplier's doesn't look right:
() -> new InetSocketAddress(
brokerSupplier().get().getHost(),
brokerSupplier().get().getPort());
Shouldn't brokerSupplier()
be called only once in this case? Calling twice could give you a different pair of host and port values.
Comment From: spring-projects-issues
Manuel Jordan commented
Agree, that was the 'raw' version. Currently I have the following:
@Bean
TcpOperations<byte[]> tcpClient_(){
logger.info("tcpClient_ ...");
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
logger.info("connectAddress ...");
SocketAddress socketAddress = socketAddressSupplier().get();
logger.info(" {}", socketAddress.toString());
return socketAddress;
});
};
logger.info("... tcpClient_");
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
}
Supplier<? extends SocketAddress> socketAddressSupplier(){
logger.info("socketAddressSupplier ...");
Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
StompBrokerServer stompBrokerServer =
StompBrokerServerContainer.retrieveStompBrokerServer("server");
return new InetSocketAddress(stompBrokerServer.getHost(), stompBrokerServer.getPort());
};
return socketAddressSupplier;
}
And I have the following just one time (all together - one block)
09:50:47.357 97937 [tcp-client-loop-nio-4] WARN o.s.m.s.s.StompBrokerRelayMessageHandler - TCP connection failure in session _system_: Failed to connect: connection timed out: /192.168.1.3:61613
io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.1.3:61613
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:125)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:465)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO c.m.j.c.websocket.TcpClientConfig - connectAddress ...
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO c.m.j.c.websocket.TcpClientConfig - socketAddressSupplier ...
09:50:47.358 97938 [tcp-client-loop-nio-4] INFO c.m.j.c.websocket.TcpClientConfig - /127.0.0.1:61613
09:50:47.454 98034 [tcp-client-scheduler-1] INFO o.s.m.s.s.StompBrokerRelayMessageHandler - "System" session
From above observe that finally fails the primary unavailable server Failed to connect: connection timed out: /192.168.1.3:61613
and just then the secondary available server is detected (c.m.j.c.websocket.TcpClientConfig - 127.0.0.1:61613)
and is used how the new broker for stomp.
Is it the expected behaviour? Fail a latest time (for primary server) and work with other address (for the secondary server)? I am most concerned about the former part, if I should see that exception about the primary server
Thanks for your support
Comment From: PaulGobin
Did anyone figure out how to auto connect to a different broker in the list if the connected broker becomes unavailable? I couldn't find a way to detect the "lostConnection event handler or callback". If I could, I can add code to reconnect to a different broker. Maybe an external load balancer will do the trick?