Affects: 2.7.18


I'm using SimpMessagingTemplate#convertAndSend to send message to a stomp client with following code.

  @SubscribeMapping("/jobs/{id}/changes")
  public List<Change> changes(@DestinationVariable Long id) throws InterruptedException {
    log.info("Receive subscribe changes of job {}", id);
    new Thread(() -> {
      int i = 6;
      for (int j = 0; j < 5; j++) {
        List<Change> ces = new ArrayList<>();
        ces.add(new Change(Instant.now(), String.format("This is change %s", i++)));
        messagingTemplate.convertAndSend(String.format("/app/jobs/%s/changes", id), ces);
      }

    }).start();
    List<Change> changes = new ArrayList<>();
    changes.add(new Change(Instant.now(), "This is change 1"));
    changes.add(new Change(Instant.now(), "This is change 2"));
    changes.add(new Change(Instant.now(), "This is change 3"));
    changes.add(new Change(Instant.now(), "This is change 4"));
    changes.add(new Change(Instant.now(), "This is change 5"));
    Thread.sleep(5000);
    return changes;
  }
}

There will be 1 or 2 message out of the message 6~10 that cannot be received by the client. If i add a sleep(10) before convertAndSend, everything is fine.

Comment From: snicoll

What is 2.15.0? I am afraid we can't investigate a problem based on a partial code snippet. Do you have a sample handy for us?

Comment From: tinystorm

Sorry, I read the wrong version. The springboot is version is 2.7.18. Here are the full code.

Backend code

package xxx.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * stomp config
 *
 * @author meng
 */
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {


  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry
        .addEndpoint("/stomp")
        .setAllowedOriginPatterns("*")
        .withSockJS();
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config.setApplicationDestinationPrefixes("/app");
    config.enableSimpleBroker("/app");
  }

}
package xxxr.stomp;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

@RestController("/api/v1")
@Slf4j
@AllArgsConstructor
public class MessageController {
  private final SimpMessagingTemplate messagingTemplate;

  @SubscribeMapping("/jobs/{id}/changes")
  public List<Change> changes(@DestinationVariable Long id) throws InterruptedException {
    log.info("Receive subscribe changes of job {}", id);
    new Thread(() -> {
      int i = 6;
      for (int j = 0; j < 5; j++) {
        List<Change> ces = new ArrayList<>();
        ces.add(new Change(Instant.now(), String.format("This is change %s", i++)));
        messagingTemplate.convertAndSend(String.format("/app/jobs/%s/changes", id), ces);
      }

    }).start();
    List<Change> changes = new ArrayList<>();
    changes.add(new Change(Instant.now(), "This is change 1"));
    changes.add(new Change(Instant.now(), "This is change 2"));
    changes.add(new Change(Instant.now(), "This is change 3"));
    changes.add(new Change(Instant.now(), "This is change 4"));
    changes.add(new Change(Instant.now(), "This is change 5"));
    Thread.sleep(5000);
    return changes;
  }
}

package xxx;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class WebApplication {
  public static void main(String[] args) {
    SpringApplication.run(WebApplication.class);
  }
}

Frontend code

import './App.css'
import {useState} from "react";
import {StompSessionProvider, useSubscription} from "react-stomp-hooks";

const ChangeLog = () => {
    const [changes, setChanges] = useState([]);
    console.log("Ready to subscribe!")
    useSubscription('/app/jobs/1/changes', message => {
        console.log("Receive message",message)
        setChanges([...changes, ...JSON.parse(message.body)])
    })
    return (<>
        <h1>Stomp POC</h1>
        {changes.map((change, index) => (<h5 key={index}>{change.time} : {change.content}</h5>))}
    </>)
}
const App = () => (
    <StompSessionProvider url='/stomp'>
        <ChangeLog/>
    </StompSessionProvider>
);

export default App

Comment From: snicoll

Can you please move that to a sample we can run ourselves with instructions on how to reproduce? Pasting code in text is not helping as it's not complete and we may miss an important step when trying to recreate what you see.

Comment From: tinystorm

I carefully looked at the frontend websocket message flow and found that all messages were there. Probably caused by my front-end processing problem. Sorry to bother you.

Comment From: bclozel

Thanks for letting us know!