Biju Kunjummen opened SPR-12452 and commented
Spring Websocket supports client STOMP subscriptions through a Message broker relay component which in turn talks to full fledged STOMP brokers like RabbitMQ and ActiveMQ.
RabbitMQ supports a clustered deployment, for native AMQP messaging the client connection factory has a way to specify all the hosts in the cluster(org.springframework.amqp.rabbit.connection.AbstractConnectionFactory#setAddresses
). To support a similar set-up for a cluster of RabbitMQ servers over STOMP, an option will be to use a loadbalancer, however if support can be natively provided within Spring Websocket relay that will be even better.
Another potential option is to support Stomp from the client but then turn that to native AMQP or JMS
Affects: 4.1.2
Issue Links: - #21341 Add description for StompBrokerRelayMessageHandler - #22055 ReactorNettyTcpClient constructor with callback to initialize TcpClient
Referenced from: commits https://github.com/spring-projects/spring-framework/commit/88a17a4b10a110eee9c30585999a275ccffa3c17, https://github.com/spring-projects/spring-framework/commit/d512cca3fde8759490abdc7222f54cb096a8097a
Backported to: 4.3.15
6 votes, 15 watchers
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Biju Kunjummen, after taking a closer look, this appears to be a little more involved than anticipated since it's not merely about the initial connect but also about reconnect logic on which we do rely to the shared "system" connection. At present the TcpClient from Reactor is built to work with a single address. Yet the Reconnect type it accepts in one of its open method variants allows returning a different address. I am reluctant to proceed until there is some clarity around what changes might be made in Reactor related to the use case. I'm going to push this back to 4.1.4 to allow more time. We might still do something in that time frame or alternatively if we need a Reactor 2.0 release this may get pushed to 4.2.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
I've opened this ticket in the reactor project https://github.com/reactor/reactor/issues/412. At this point a 4.2 target seems most likely.
Comment From: spring-projects-issues
Biju Kunjummen commented
Thanks for following up Rossen. The timeline works, the client has already decided to go for a hardware loadbalancer for the RabbitMQ STOMP cluster and the native load balancing approach with Spring Websockets/reactor a good to have for them.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Unfortunately the Reactor 1.x - 2.0 upgrade happened rather late in the 4.2 cycle so we'll need to postpone this further.
Comment From: spring-projects-issues
Petr Kukrál commented
Rossen Stoyanchev, are there any news about this ticket? I look at the related jira ticket in the reactor project and it seems closed without resolution. Our team try find out solution how to horizonally scale rabbitMQ with websocket connection over stomp. The main issue is that we have several tomcat instances but only one rabbitMQ instance and we need to feed all clients with same messages over websockets. The situation is that if any of tomcat get message from shared queue (implemented via another rabbitMQ cluster in HA) and after processing it to final state than publishes it to all clients.
Our idea of scaling is that each tomcat will connect to one of the rabbitMQ instance from cluster during startup and later when clients want to create WS connection then tomcat will create WS connection via dedicated rabbitMQ from cluster. This is not an issue. But once any of tomcat get message then it needs to publish it to whole rabbitMQ cluster and not only to one that is connected to tomcat. We don't need to reconnect clients to another rabbitMQ in cluster. They can be aligned to rabbitMQ based on tomcat. Clients requests to tomcats are load balanced.
Would you mind that it is possible with/without changes in reactor project and made changes only in spring websocket project?
Comment From: spring-projects-issues
Filip Hrisafov commented
A small suggestion for this. It is not that difficult to implement a standalone implementation of a Reactor2TcpClient
that would support using Supplier<InetSocketAddress>
instead of just the one address?
I managed to do something for my by using Spec.TcpClientSpec#connect(Supplier<InetSocketAddress>)
. If this was exposed in a constructor then one can really easily implement something like the ActiveMQ failover protocol. I am also open in providing a patch for this if you are willing to accept it for the 4.3.x branch. I am not sure if it is the same for the master (as the reactor implementation there is different).
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Filip Hrisafov, an extra constructor should be no problem for a 4.3 backport, but we had this issue which I don't believe was fixed in the 2.0.x line. Have you been able to confirm if the issue affects your experiment? If it does, as I suspect is the case, then we can only work on support for this in 5.x.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
I've created Reactor Netty #302 against the current Reactor Netty project.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Filip Hrisafov based discussion under Reactor Netty #302, I see now that the original issue (based on Reactor Net 2.0.x) was related to the Reconnect
strategy but you have found a different way by passing a Supplier<InteSocketAddress>
at construction time. Feel free to submit a patch based on your suggestion.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
I'm scheduling for 5.0.5 and 4.3.15 since it sounds like this should work. We just need to confirm it works as expected, and possibly provide extra constructor options to make it a little easier.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
There is now a new Reactor2TcpClient
constructor (in 4.3.x) that accepts a Supplier<InetSocketAddress>
along with a config option to plug in the client. There is an example in the docs.
Note that in master (5.x) the ReactorNettyTcpClient
already has a constructor that takes a Consumer<ClientOptions.Builder<?>>
, so all that needed to be done is expose the config option and update the docs.
Comment From: spring-projects-issues
Filip Hrisafov commented
Thanks a lot for adding this Rossen. The documentation is amazing :). I know it would help a lot of people
Comment From: spring-projects-issues
Rossen Stoyanchev commented
No worries, and thanks for the tip!
Comment From: spring-projects-issues
Petr Kukrál commented
Thanks a lot too. This will help us with proper load balancing of the clients :)
Comment From: spring-projects-issues
luisalves00 commented
Hi,
I have something like this:
@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
// Artemis -> tcp://0.0.0.0:61613
config.enableStompBrokerRelay("/topic").setTcpClient(createTcpClient());
config.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
final List<InetSocketAddress> addressList = new ArrayList<>();
addressList.add(new InetSocketAddress("192.168.0.1", 61613));
addressList.add(new InetSocketAddress("192.168.0.2", 61613));
addressList.add(new InetSocketAddress("192.168.0.3", 61613));
addressList.add(new InetSocketAddress("192.168.0.4", 61613));
final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
final Consumer<ClientOptions.Builder<?>> builderConsumer = b -> {
b.connectAddress(() -> { return addresses.getNext(); });
};
return new ReactorNettyTcpClient<byte[]>(builderConsumer, new StompReactorNettyCodec());
}
But I think "192.168.0.1" will take all the load until is down. Then Message Broker Relay will move to the next IP. Am I wrong?
Comment From: spring-projects-issues
luisalves00 commented
I was wrong. The round robin works fine. Each new consumer will be connected to the next address.
Comment From: spring-projects-issues
Alan So commented
I find that the version 5.1 has removed public the constructor "ReactorNettyTcpClient(Consumer<ClientOptions.Builder\<?>> optionsConsumer,public ReactorNettyTcpClient(Consumer\<ClientOptions.Builder\<?>> optionsConsumer, ReactorNettyCodec\
codec)". Could you please help to merge it into 5.1?
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Infinity821 see #22055.
Comment From: PaulGobin
Hey guys @spring-issuemaster @rstoyanchev I realize that this is an old post, however, I was wondering how do I use this code? It seem like "ClientOptions" was removed? I am using SpringBoot 2.3.4 and would like to use a list of clustered broker addresses.
Any help will be much appreciated.
private ReactorNettyTcpClient<byte[]> createTcpClient() {
final List<InetSocketAddress> addressList = new ArrayList<>();
addressList.add(new InetSocketAddress("192.168.0.1", 61613));
addressList.add(new InetSocketAddress("192.168.0.2", 61613));
addressList.add(new InetSocketAddress("192.168.0.3", 61613));
addressList.add(new InetSocketAddress("192.168.0.4", 61613));
final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
final Consumer<**ClientOptions**.Builder<?>> builderConsumer = b -> {
b.connectAddress(() -> { return addresses.getNext(); });
};
return new ReactorNettyTcpClient<byte[]>(builderConsumer, new StompReactorNettyCodec());
}
Thanks in advance.
Comment From: kmandalas
@PaulGobin I suppose what is mentioned here: https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-stomp-handle-broker-relay-configure does not cover you.
I also have similar situation but with the following difference (I think): in my case I have an ArtemisMQ 2-node Cluster in active/passive mode.
@spring-issuemaster please advise, there is lack of documentation/examples IMO for such cases. I do not think such should be subject to StackOverflow questions so I took the liberty to open new issue: https://github.com/spring-projects/spring-framework/issues/26169