Affects: 5.3.3


I'm trying to use RSocket to have two-way streaming between client and server, with server (sometimes) being the first to start sending data. The problem I have is that Spring/RSocket will delay establishing REQUEST_CHANNEL until it has a first payload to sent to server, but I would like it to establish that immediately on subscribe.

Here's a short reproducer. Client subscribes, and than sends initial data with 2 seconds delay. The server handler only gets called after that delay.

package example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;

import java.net.URI;
import java.time.Duration;

@SpringBootApplication
public class ExampleApplication {
    private static final Logger log = LoggerFactory.getLogger(ExampleApplication.class);

    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(ExampleApplication.class, args);
        RSocketRequester requester = ctx.getBean(RSocketRequester.Builder.class)
            .websocket(URI.create("ws://localhost:8080/rsocket"));

        requester.route("example")
            .data(
                Flux.just("A", "B")
                    .doOnSubscribe(subscription -> log.info("Client subscribed"))
                    .delayElements(Duration.ofSeconds(2))
                    .doOnNext(value -> log.info("Client sending {}", value))
            )
            .retrieveFlux(String.class)
            .blockLast();

        requester.rsocketClient().dispose();
        ctx.close();
    }
}

@Controller
class ExampleController {
    private static final Logger log = LoggerFactory.getLogger(ExampleController.class);

    @MessageMapping("example")
    Flux<String> example(Flux<String> requests) {
        // This should've be logged immediately after "Client subscribed", and before "Client sending"
        log.info("Server received request");
        return requests
            .doOnNext(value -> log.info("Server received {}", value))
            .thenMany(Flux.empty());
    }
}
spring:
  rsocket:
    server:
      transport: websocket
      mapping-path: /rsocket

As a workaround, I'm having client send some dummy data immediately, which server than ignores.

Comment From: rstoyanchev

This is reproducible without Spring:

  public static void main(String[] args) {

    logger.info("Start of sample");

    SocketAcceptor echoAcceptor =
        SocketAcceptor.forRequestChannel(
            (publisher -> {
              logger.info("Server received request");
              return Flux.from(publisher).thenMany(Mono.empty());
            }));

    RSocketServer.create(echoAcceptor)
        .bind(TcpServerTransport.create("localhost", 7000))
        .subscribe();

    RSocket rsocket =
        RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)).block();

    Flux<Payload> payloads =
        Flux.just("A", "B")
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(s -> logger.info("Echo: " + s))
            .map(DefaultPayload::create);

    rsocket
        .requestChannel(payloads)
        .map(Payload::getDataUtf8)
        .doOnNext(logger::debug)
        .take(10)
        .doFinally(signalType -> rsocket.dispose())
        .then()
        .block();
  }

It is how RSocket Java and also the RSocket protocol work since the REQUEST_CHANNEL frame, like all request frames, contains both metadata and initial data. So this will have to be a change on that level in the very least.

Thinking about it, while I can understand the surprise, intuitively the current behavior makes sense. The alternative would be to require some way of explicitly indicating that you want to start without an initial data payload. Then there would have to be a way to indicate on the REQUEST_CHANNEL frame that the data is to be ignored (not quite the same as "is empty"). That would require a little extra complexity in the protocol and the RSocket Java API.

Have you considered whether it really needs to be a request channel? As opposed to say, start a request stream independently from either side, or even use request response periodically. It is all multiplexed on the same connection so there is no penalty and most often there is no need to model as request channel unless the streams area really inter-dependent.

Comment From: hban

Yes, I'm aware that RSocket protocol requires data for REQUEST_CHANNEL frame. And I'm not sure changing this on RSocket (Java) level would be a correct place. RSocket only cares about moving payloads around, it's up to applications to ignore them or not.

The behavior I (incorrectly I guess) expected was that Spring would immediately send initial payload with metadata and empty data, and than transmit actual Flux elements as data-only payloads.

Regarding your suggestion with two separate streams - it had crosses my mind, but it more difficult to do since I have multiple clients/server nodes and load balancing in the mix. Using single REQUEST_CHANNEL was easier than trying to have server hunt down correct client RSocket to initialize backward stream.

I guess this is less a bug and more a feature request. If it's not feasible to do, than feel free to close this issue. I don't mind using my "dummy value first" workaround.

Comment From: rstoyanchev

This can't be done in Spring without making assumptions that both sides are Java, Spring (and spring-messaging for that). The moment one side isn't, it no longer works.

In addition, when you supply a Publisher it's not known when the first data item will materialize. We could always insert empty data but that would be odd (not to mention less efficient) for cases where the input data does show up without delay. Or if even if it's something like 2 seconds late, it might still be preferable to wait, depending on the use case.

So it has to be explicit and it has to be a feature at a lower level below Spring. Or it can be done at application level where you know better what's on each side.