I have two rsocket-client and one rsocket-server. Both rsocket-client initially subscribe to the broadcast route listenCommand to receive commands from rsocket-server. Then one of the clients sends a command on route executeCommand to send it to other rsocketRequsters besides itself via the previously established route listenCommand connection. That is, I need to send the message to all rsocketRequester excluding myself through the already established connection between client and server. Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription. I've seen that it is suggested to make a client-side handler, but is it possible to solve this problem on the server? Maybe I'm missing something important? Expected result: the client that sends the command will not receive it over the broadcast connection, but the other client will.

Rsocket-server controller:

private Sinks.Many<String> executedCommandSink = Sinks.many().multicast()
    .directBestEffort();

@MessageMapping("project.{projectId}.command")
public Flux<String> listenCommand(@DestinationVariable String projectId, RSocketRequester requester) {

    return executedCommandSink.asFlux();
}

@MessageMapping("project.{projectId}.command.execute")
public Mono<?> executeCommand(
        @DestinationVariable String projectId, Mono<String> commandMono, RSocketRequester requester) {

    Set<RSocketRequester> requesters = getRequestersForSendCommand(projectId, requester);

    return commandMono.flatMap(command -> {
        sendGraphCommand(requesters, command);
        return Mono.just(command);
    });
}

private void sendCommand(Set<RSocketRequester> requesters, String command) {
    requesters.forEach(requesterToSend -> requesterToSend
        .route("project.{projectId}.command", command)
        .data(Flux.just(command))
        .retrieveFlux(String.class)
        .subscribe());
}

public Set<RSocketRequester> getRequestersForSendGraphCommand(String projectId, RSocketRequester requester) {
    Set<RSocketRequester> sessionsByProject = ConcurrentHashMap.newKeySet();
    allRequesters.forEachEntry(1L, entry -> {
        if (entry.getValue().equals(projectId) && entry.getKey() != requester) {
            sessionsByProject.add(entry.getKey());
        }
    });
    return sessionsByProject;
}

Rsocket-client:

@Autowired
    public RSocketManagerClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {

        clientUUID = UUID.randomUUID().toString();
        log.info("Connecting using client ID: {}", clientUUID);

        this.rsocketRequester = rsocketRequesterBuilder
                .setupData(clientUUID)
                .rsocketStrategies(strategies)
    .connectWebSocket(URI.create("ws://127.0.0.1:8307/rsocket"))
                .subscribeOn(Schedulers.parallel())
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doFirst(() -> log.info("Client: {} CONNECTED.", clientUUID))
                .doOnError(error -> log.error("Connection to client {} CLOSED", clientUUID))
                .doFinally(consumer -> log.info("Client {} DISCONNECTED", clientUUID))
                .subscribe();

public void executeCommand() {
    log.info("\nClient with id-{} subscribe on execute command", clientUUID);

    String block = this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command.execute")
            .data("Hello")
            .retrieveMono(String.class)
            .doOnNext(System.out::println)
            .block();
    log.info("Response: {}", block);
}

public void subscribeOnCommand() {
    log.info("\nClient with id-{} subscribe on command", clientUUID);

    this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command")
            .retrieveFlux(String.class)
            .doOnNext(System.out::println)
            .subscribe();
}

Comment From: rstoyanchev

Thanks for getting in touch, but it feels like this is a question that would be better suited to Stack Overflow. As mentioned in the guidelines for contributing, we prefer to use the issue tracker only for bugs and enhancements. Feel free to update this issue with a link to the re-posted question (so that other people can find it) or add some more details if you feel this is a genuine bug.