Gabriel Dogaru opened SPR-17523 and commented

This is the same issue as #17057

The solution there does not apply anymore because that constructor was removed in this commit https://github.com/spring-projects/spring-framework/commit/ffbc75ae47936c8668c55332e5b70142279f64ac#diff-11144739384955df1f8f38cbcde8d95b


Affects: 5.1.2

Reference URL: #17057

Issue Links: - #20933 Upgrade to Reactor Netty 0.8 - #17057 Spring Websockets Broker relay supporting a cluster of STOMP endpoint addresses

Referenced from: commits https://github.com/spring-projects/spring-framework/commit/24848ec1bcec59d3fc93ddd77a084b637266db93

Comment From: spring-projects-issues

Rossen Stoyanchev commented

The Reactor Netty config API changed from 0.7 to 0.8. We lost the builder to an immutable pattern with config methods returning a new instance of TcpClient.

You still have the constructor taking TcpClient but that requires to take over resource management, which could be done in 5.1+ with the ReactorResourceFactor, but I get the point that an option to provide a supplier of addresses without taking over the management of resource is now missing.

I'll add a constructor with those semantics. Something like this:

public ReactorNettyTcpClient(Function<TcpClient, TcpClient> configurer, ReactorNettyCodec<P> codec) {
    // ...
}

So you could then:

new ReactorNettyTcpClient(client -> client.addressSupplier(...), codec);

I'll also update the sample in the docs.

Comment From: PaulGobin

I think I got it: `private ReactorNettyTcpClient createTcpClient() {

    final List<InetSocketAddress> addressList = new ArrayList<>();

    addressList.add(new InetSocketAddress("192.168.0.2", 61613));
    addressList.add(new InetSocketAddress("192.168.0.3", 61613));
    addressList.add(new InetSocketAddress("192.168.0.4", 61613));
    addressList.add(new InetSocketAddress(StompBrokerRelayHost, StompBrokerRelayPort));
    final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);

    return new ReactorNettyTcpClient<>(client -> client.remoteAddress(() -> addresses.get()), new StompReactorNettyCodec());

}`

Comment From: sharunthomas

Is the above solution works with spring boot 1.5.X ?

Comment From: PaulGobin

I don't believe so, but I would recommend to upgrade to 2.3.4 if you can.

Comment From: sharunthomas

But I cannot see a method called .setTcpClient() in my MessageBrokerRegistry class..

registry.enableStompBrokerRelay("/queue/","/topic/")
                    .setRelayHost(hostname)
                    .setRelayPort(port)
                    .setClientLogin(username)
                    .setClientPasscode(password)
                    .setAutoStartup(true);

so I cannot add all hosts in the rabbitMQ cluster, It would be a big help if you can comment on my situation.

Dependencies(build.gradle):

dependencies {
//    developmentOnly("org.springframework.boot:spring-boot-devtools")
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compile('org.springframework.boot:spring-boot-starter-web')
    //compile('org.springframework.boot:spring-boot-starter-security')
    //compile('org.springframework.boot:spring-boot-devtools')

    compile 'mysql:mysql-connector-java:5.1.44'
    jooqRuntime 'mysql:mysql-connector-java:5.1.44'
    //Apache commons libraries
    compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.1'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
    compile group: 'org.apache.commons', name: 'commons-email', version: '1.4'
    compile group: 'commons-beanutils', name: 'commons-beanutils', version: '1.9.1'
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
    compile group: 'commons-lang', name: 'commons-lang', version: '2.6'
    compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.2'
    compile group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.4'
    compile group: 'com.needstreet', name: 'v4.commons', version: '1.7'
    compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.7.0'
    compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.7.0'
    //compile group: 'io.springfox', name: 'springfox-bean-validators', version: '2.7.0'
    compile 'org.jsoup:jsoup:1.12.1'

    compile group: 'org.springframework.data', name: 'spring-data-envers', version: '1.0.6.RELEASE'
    compile("org.springframework.boot:spring-boot-starter-amqp")
// https://mvnrepository.com/artifact/org.thymeleaf/thymeleaf-spring4
//    compile group: 'org.thymeleaf', name: 'thymeleaf-spring4', version: '2.1.4.RELEASE'

    compile group: 'org.springframework.boot', name: 'spring-boot-starter-thymeleaf', version: '1.5.9.RELEASE'


    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompile group: 'org.assertj', name: 'assertj-core', version: '3.5.2'
    compile 'com.razorpay:razorpay-java:1.3.4'
    compile 'com.google.code.gson:gson:2.6.2'

//    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
    compile group: 'org.atmosphere', name: 'atmosphere-runtime', version: '2.4.14'
    compile group: 'javax.inject', name: 'javax.inject', version: '1'
    compile group: 'com.auth0', name: 'java-jwt', version: '3.2.0'
//    compile group: 'org.apache.ignite', name: 'ignite-core', version: '2.1.4'
//    compile group: 'org.apache.ignite', name: 'ignite-core', version: '2.4.0'
//    compile group: 'com.tokbox', name: 'opentok-server-sdk', version: '4.2.0'
    compile group: 'com.tokbox', name: 'opentok-server-sdk', version: '4.3.0'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-security'
    compile group: 'org.springframework.security', name: 'spring-security-jwt'
    compile group: 'org.springframework.security.oauth', name: 'spring-security-oauth2'

//    compile("org.springframework.boot:spring-boot-devtools")
//    testCompile('com.h2database:h2')
    compile 'com.amazonaws:aws-java-sdk-s3'
//    compile "org.flywaydb:flyway-core:5.0.7"
    compile('org.springframework.boot:spring-boot-starter-jooq')
    compile('org.springframework.boot:spring-boot-starter-groovy-templates')
    compile('org.springframework.cloud:spring-cloud-starter-aws-messaging')

    //netflix zuul sdk
   //compile('org.springframework.cloud:spring-cloud-starter-netflix-zuul')
    //compile group: 'com.marcosbarbero.cloud', name: 'spring-cloud-zuul-ratelimit', version: '1.7.5.RELEASE'

    //aws ses sdk
    compile('com.amazonaws:aws-java-sdk-ses')
    compile('com.amazonaws:aws-java-sdk-sns:1.11.267')
    compile group: 'com.github.fge', name: 'jackson-coreutils', version: '1.0'
    compile("org.springframework.boot:spring-boot-starter-cache")
    compile group: 'com.googlecode.libphonenumber', name: 'libphonenumber', version: '8.10.1'
    compile group: 'javax.mail', name: 'mail', version: '1.4.1'
    compile('javax.servlet:jstl')
    compile('org.apache.tomcat.embed:tomcat-embed-jasper')
    compile("com.google.apis:google-api-services-calendar:v3-rev224-1.22.0")
    compile group: "com.twilio.sdk", name: "twilio", version: "7.17.+"
    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-mail'
//    compile 'com.mixpanel:mixpanel-java:1.4.3'
//    compile 'org.json:json:20151123'
    compile group: 'com.maxmind.geoip2', name: 'geoip2', version: '2.13.0'
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-csv
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-csv', version: '2.9.6'
    compile "com.stripe:stripe-java:18.7.0"
    compile "org.springframework.boot:spring-boot-starter-websocket"
    implementation "com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:20191001.1"
    compile('org.springframework.session:spring-session-jdbc')


    compile group: 'com.google.firebase', name: 'firebase-admin', version: '6.12.2'

    //mailchimp client sdk dependencies.
    compile group: 'com.github.banana-j', name: 'bananaj', version: '0.6.2'

    // https://mvnrepository.com/artifact/org.apache.commons/commons-compress
    compile group: 'org.apache.commons', name: 'commons-compress', version: '1.20'



    compile group: 'io.projectreactor', name: 'reactor-core', version: '2.0.6.RELEASE'
    compile group: 'io.projectreactor', name: 'reactor-net', version: '2.0.6.RELEASE'
    compile group: 'io.netty', name: 'netty-all', version: '4.1.22.Final'
//    compile group: 'io.projectreactor.spring', name: 'reactor-spring-context', version: '2.0.5.RELEASE'
}



Comment From: snicoll

@sharunthomas as mentioned in the guidelines for contributing, we prefer to use GitHub issues only for bugs and enhancements. For questions, please follow-up on StackOverflow. Note also that Spring Boot 2.0.x is EOL.

Comment From: PaulGobin

@sharunthomas This is how I do it:

@Override
    public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry)
    {
StompBrokerRelayRegistration broker = messageBrokerRegistry.enableStompBrokerRelay(ChatConstants.__BROKER_PREDICATE_TOPIC, ChatConstants.__BROKER_PREDICATE_QUEUE);
            broker.setTcpClient(createTcpClient());
            // broker.setRelayHost(StompBrokerRelayHost);
            // broker.setRelayPort(StompBrokerRelayPort);
            broker.setSystemLogin(brokerUser);
            broker.setSystemPasscode(brokerPassword);
            broker.setClientLogin(stompClientUser);
            broker.setClientPasscode(stompClientPassword);
}
/************************************************************************************************
     * 
     * @return
     */
    private ReactorNettyTcpClient<byte[]> createTcpClient()
    {
        final List<InetSocketAddress> addressList = new ArrayList<>();
        String[] StompBrokerRelayHosts = StringUtils.split(StompBrokerRelayHostCluster, ",");
        for (String host : StompBrokerRelayHosts)
        {
            log.info("**** ADDING STOMP BROKER [" + host + "] TO THE CLUSTER LIST. ****");
            addressList.add(new InetSocketAddress(StringUtils.trim(host), StompBrokerRelayPort));
        }
        final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
        return new ReactorNettyTcpClient<>(client -> client.remoteAddress(() -> addresses.get()), new StompReactorNettyCodec());
    }

/********************************************/
import java.util.Collection;
import java.util.Iterator;

public class RoundRobinList<T> {

    private Iterator<T> iterator;
    private final Collection<T> elements;

    public RoundRobinList(Collection<T> elements)
    {
        this.elements = elements;
        iterator = this.elements.iterator();
    }

    synchronized public T get()
    {
        if (iterator.hasNext())
        {
            return iterator.next();
        } else
        {
            iterator = elements.iterator();
            return iterator.next();
        }
    }

    public int size()
    {
        return elements.size();
    }
}