spencercw opened SPR-17315 and commented

I'm trying to set up a project using WebSockets with the 'subscribe and snapshot' pattern (i.e., subscribing to a stream of updates and requesting a snapshot for initialisation).

From what I've been able to gather, the intended way to get the initial snapshot is to use @SubscribeMapping. I have put together a simple test class:

@Controller
public class GreetingController {
    private Logger logger = LoggerFactory.getLogger(GreetingController.class);

    private final SimpMessagingTemplate messagingTemplate;

    private AtomicInteger value = new AtomicInteger();

    @Autowired
    public GreetingController(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    @SubscribeMapping("/greetings")
    public int init() {
        int x = value.get();
        logger.info("init " + x);
        return x;
    }

    @Scheduled(fixedRate = 1000)
    public void poll() {
        int x = value.incrementAndGet();
        logger.info("pushing " + x);
        messagingTemplate.convertAndSend("/topic/greetings", x);
    }
}

My client first subscribes to /topic/greetings and then /app/greetings. By doing it in this order, updates may be delivered to the client before the initial snapshot (which is fine), but the reverse order would create a brief period where an update could be generated after the snapshot but not delivered to the client.

You can see in the log below, both subscription messages are received in thread 'http-nio-8080-exec-9'. From there they are passed to the 'clientInboundChannel' thread pool. The topic subscription is handled by 'clientInboundChannel-5' while the app subscription is handled by 'clientInboundChannel-7'.

There is a race condition here because the topic subscription could be delayed long enough for an update to be generated after the app subscription has completed but before the topic subscription has completed, which would not be delivered to the client. I can artificially induce this by putting a breakpoint in SimpleBrokerMessageHandler.handleMessageInternal() and suspending only that thread.

As far as I can tell, it should be safe if I limit the thread pool to one thread, but that's obviously not ideal. Am I doing something wrong; is there some better way to do this?

2018-09-30 15:46:27.330 TRACE 32092 --- [nio-8080-exec-9] o.s.messaging.simp.stomp.StompDecoder    : Decoded SUBSCRIBE {id=[sub-0], destination=[/topic/greetings]} session=null
2018-09-30 15:46:27.330 TRACE 32092 --- [nio-8080-exec-9] o.s.w.s.m.StompSubProtocolHandler        : From client: SUBSCRIBE /topic/greetings id=sub-0 session=ok5b3h1f
2018-09-30 15:46:27.331 TRACE 32092 --- [nio-8080-exec-9] ConfigServletWebServerApplicationContext : Publishing event in org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@536dbea0: SessionSubscribeEvent[GenericMessage [payload=byte[0], headers={simpMessageType=SUBSCRIBE, stompCommand=SUBSCRIBE, nativeHeaders={id=[sub-0], destination=[/topic/greetings]}, simpSessionAttributes={}, simpHeartbeat=[J@63b1df68, simpSubscriptionId=sub-0, simpSessionId=ok5b3h1f, simpDestination=/topic/greetings}]]
2018-09-30 15:46:27.331 DEBUG 32092 --- [nboundChannel-5] o.s.m.s.b.SimpleBrokerMessageHandler     : Processing SUBSCRIBE /topic/greetings id=sub-0 session=ok5b3h1f
2018-09-30 15:46:27.331 DEBUG 32092 --- [nio-8080-exec-9] o.a.t.websocket.server.WsFrameServer     : WebSocket frame received. fin [true], rsv [4], OpCode [1], payload length [13]
2018-09-30 15:46:27.332 TRACE 32092 --- [nio-8080-exec-9] s.w.s.h.LoggingWebSocketHandlerDecorator : Handling TextMessage payload=[SUBSCRIBE
..], byteCount=48, last=true] in WebSocketServerSockJsSession[id=ok5b3h1f]
2018-09-30 15:46:27.332 TRACE 32092 --- [nio-8080-exec-9] o.s.messaging.simp.stomp.StompDecoder    : Decoded SUBSCRIBE {id=[sub-1], destination=[/app/greetings]} session=null
2018-09-30 15:46:27.332 TRACE 32092 --- [nio-8080-exec-9] o.s.w.s.m.StompSubProtocolHandler        : From client: SUBSCRIBE /app/greetings id=sub-1 session=ok5b3h1f
2018-09-30 15:46:27.332 TRACE 32092 --- [nio-8080-exec-9] ConfigServletWebServerApplicationContext : Publishing event in org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@536dbea0: SessionSubscribeEvent[GenericMessage [payload=byte[0], headers={simpMessageType=SUBSCRIBE, stompCommand=SUBSCRIBE, nativeHeaders={id=[sub-1], destination=[/app/greetings]}, simpSessionAttributes={}, simpHeartbeat=[J@6148f8ed, simpSubscriptionId=sub-1, simpSessionId=ok5b3h1f, simpDestination=/app/greetings}]]
2018-09-30 15:46:27.332 DEBUG 32092 --- [nio-8080-exec-9] o.a.tomcat.util.net.SocketWrapperBase    : Socket: [org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper@3fdb1e3c:org.apache.tomcat.util.net.NioChannel@311ce66f:java.nio.channels.SocketChannel[connected local=0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:8080 remote=/0:0:0:0:0:0:0:1:32205]], Read from buffer: [0]
2018-09-30 15:46:27.332 DEBUG 32092 --- [nio-8080-exec-9] org.apache.tomcat.util.net.NioEndpoint   : Socket: [org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper@3fdb1e3c:org.apache.tomcat.util.net.NioChannel@311ce66f:java.nio.channels.SocketChannel[connected local=0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:8080 remote=/0:0:0:0:0:0:0:1:32205]], Read direct from socket: [0]
2018-09-30 15:46:27.332 DEBUG 32092 --- [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SUBSCRIBE /app/greetings id=sub-1 session=ok5b3h1f, lookupDestination='/greetings'
2018-09-30 15:46:27.334 TRACE 32092 --- [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Found 1 handler methods: [{[/greetings],messageType=[SUBSCRIBE]}]
2018-09-30 15:46:27.334 DEBUG 32092 --- [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Invoking hello.GreetingController#init[0 args]
2018-09-30 15:46:27.334 DEBUG 32092 --- [nboundChannel-7] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'greetingController'
2018-09-30 15:46:27.334 TRACE 32092 --- [nboundChannel-7] o.s.m.h.i.InvocableHandlerMethod         : Invoking 'hello.GreetingController.init' with arguments []
2018-09-30 15:46:27.334  INFO 32092 --- [nboundChannel-7] hello.GreetingController                 : init 9
2018-09-30 15:46:27.334 TRACE 32092 --- [nboundChannel-7] o.s.m.h.i.InvocableHandlerMethod         : Method [hello.GreetingController.init] returned [9]
2018-09-30 15:46:27.336 TRACE 32092 --- [nboundChannel-7] HandlerMethodReturnValueHandlerComposite : Processing return value with org.springframework.messaging.simp.annotation.support.SubscriptionMethodReturnValueHandler@4cb10416
2018-09-30 15:46:27.336 DEBUG 32092 --- [nboundChannel-7] a.s.SubscriptionMethodReturnValueHandler : Reply to @SubscribeMapping: 9

Affects: 5.0.9

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Starting with /app/greetings creates a potential gap, while starting with /topic/greetings creates a potential overlap. So you have to start receiving the latest first via /topic/greetings, then get the history via /app/greetings, and filter out any overlap from the history.

Comment From: spring-projects-issues

spencercw commented

Hi Rossen. I understand that, but there doesn't seem to be any way to reliably subscribe to the topic first under the default configuration where the thread pool has more than one thread. I can send the subscriptions in order, but because of the race condition described, they may be processed out of order. In principle I could wait for the first subscription to complete before sending the second subscription, but, as far as I can tell, the simple broker doesn't have any support for acknowledging subscriptions. I can't just wait for the first real message on the topic channel because in practice, if nothing is changing, no messages will be posted to the channel. This would also add an extra round-trip of latency before the first data is rendered so is not great.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Okay I understand now.

Indeed without a receipt from the topic subscription, the history may be incomplete. The simple broker does not support receipts, which is a STOMP specific header and frame, but it could be done through a ExecutorChannelInterceptor that's automatically installed if the simple broker is in use with STOMP.

I'll turn this into a ticket to support receipts with the simple broker for 5.2. In the mean time you can add the following to your configuration and that will generate RECEIPT frames:

@Configuration
static class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private MessageChannel outChannel;

    @Autowired
    public WebSocketConfig(MessageChannel clientOutboundChannel) {
        this.outChannel = clientOutboundChannel;
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        registration.interceptors(new ExecutorChannelInterceptor() {

            @Override
            public void afterMessageHandled(Message<?> inMessage,
                    MessageChannel inChannel, MessageHandler handler, Exception ex) {

                StompHeaderAccessor inAccessor = StompHeaderAccessor.wrap(inMessage);
                String receipt = inAccessor.getReceipt();
                if (StringUtils.isEmpty(receipt)) {
                    return;
                }

                StompHeaderAccessor outAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
                outAccessor.setSessionId(inAccessor.getSessionId());
                outAccessor.setReceiptId(receipt);
                outAccessor.setLeaveMutable(true);

                Message<byte[]> outMessage =
                        MessageBuilder.createMessage(new byte[0], outAccessor.getMessageHeaders());

                outChannel.send(outMessage);
            }
        });
    }
} 

Comment From: spring-projects-issues

spencercw commented

Subscription receipts would certainly be a welcome enhancement, but it doesn't feel like the best solution to this issue because it forces an extra round-trip to the server before anything is rendered (i.e., send subscription, wait for acknowledgement, send request for initial data). On a slow connection this could add a considerable amount of latency.

I have spotted another ticket #20087 which sounds like basically the same problem on the outbound channels. The same solution could be used here if that were implemented.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Unlike the outbound side where there is only a single subscriber (StompSubProtocolHandler) on the inbound side there are multiple handlers (broker, annotated controllers, user destination handler, etc). We'd have to somehow wait till all handlers are done, but the ExecutorChannelInterceptor is only designed to be called before and after a given handler on the Executor thread. Waiting on all handlers would also reduce throughput and negate some of the benefits of concurrent handling.

Serializing from the client side when needed seems like a more general solution. If the connection is to slow or hard to predict, you could introduce an intentional slowdown in the controller method with a predictable delay:

@SubscribeMapping("/greetings")
public CompletableFuture<Integer> init() {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    scheduler.schedule(() -> future.complete(value.get()), Instant.now().plusMillis(500));
    return future;
}

Comment From: spring-projects-issues

Rossen Stoyanchev commented

Note to self to add extra guidance in the reference docs based on this discussion.

Comment From: spring-projects-issues

spencercw commented

I'm just pondering this a bit more and it feels like there are probably more issues here. All messages are received on a single thread and are then dispatched to the thread pool for handling where they may be processed in a different order to how they are received. What happens if a connect, subscribe message pair is processed as subscribe, connect. Or subscribe, unsubscribe on the same topic is processed as unsubscribe, subscribe (I could see someone doing this for the initial 'app' subscription on the expectation that the data will be delivered immediately upon subscription, so there's no point keeping it open).

It might be safer to process 'control' (connection and subscription related) messages synchronously and dispatch regular 'data' messages to the thread pool.

Comment From: spring-projects-issues

Rossen Stoyanchev commented

You cannot do anything until connect-connected frames have been exchanged, and you cannot subscribe or send anything further until that's done. That's required to negotiate things like protocol version and heart beat intervals. The "/app" subscriptions only reach a controller method, which replies and the subscription is never stored, and never used again. Theoretically a subscribe-unsubscribe could be processed out of order but sending those at the same time doesn't make much sense either.

Comment From: rishiraj88

👍