I have configured registry.setPreserveReceiveOrder(true);
to process received messages in order
and am using Spring Security to perform authorization checks during subscriptions.
Before enabling this option, we were using ExecutorSubscribableChannel
, where exceptions were wrapped in a MessageDeliveryException
and thrown to the higher layers.
After enabling this option, however, exceptions are caught by OrderedMessageChannelDecorator
during processing, preventing error messages from being delivered to the client when subscription authorization fails.
Previously, exceptions were propagated to StompSubProtocolErrorHandler
via StompSubProtocolHandler.handleError()
Is this behavior—preventing exceptions from being propagated to the client—an intended design?
Is there a way to propagate exceptions to the client through StompSubProtocolErrorHandler
?
The versions in use are Spring Boot 3.3.0 and Spring WebSocket/Message 6.1.8.
Comment From: csh0034
The STOMP 1.2 specification states as follows
If the server cannot successfully create the subscription, the server MUST send the client an ERROR frame and then close the connection.
Comment From: bclozel
It's hard to know which code path exactly has changed given your description. Can you provide a minimal sample application on the latest Spring Boot 3.3.x that shows this behavior and works differently with the latest 3.2.x version ? This would help us track down Hat changes and whether this is an unintended effect. Thanks
Comment From: csh0034
I have added a test case to check if StompSubProtocolErrorHandler
is invoked depending on the registry.setPreserveReceiveOrder(true)
configuration.
Additionally, I started working from version 3.3.0, so I'm not sure if the issue was introduced due to changes in the 3.3.x version.
The issue remains the same even when using the latest Spring Boot 3.3.x.
Comment From: csh0034
If an exception occurs during subscription processing
registry.setPreserveReceiveOrder(false);
- ExecutorSubscribableChannel[clientInboundChannel]
public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware {
// ...
@Override
public final boolean send(Message<?> message, long timeout) {
try {
//...
} catch (Exception ex) {
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException messagingException) {
throw messagingException;
}
throw new MessageDeliveryException(messageToUse, "Failed to send message to " + this, ex); // reach here
}
}
//...
}
public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationEventPublisherAware {
// ...
@Override
public void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> webSocketMessage,
MessageChannel targetChannel) {
for (Message<byte[]> message : messages) {
// ...
try {
// ...
try {
SimpAttributesContextHolder.setAttributesFromMessage(message);
sent = channelToUse.send(message);
if (sent) {
if (this.eventPublisher != null) {
Principal user = getUser(session);
if (isConnect) {
publishEvent(this.eventPublisher, new SessionConnectEvent(this, message, user));
} else if (StompCommand.SUBSCRIBE.equals(command)) {
publishEvent(this.eventPublisher, new SessionSubscribeEvent(this, message, user));
} else if (StompCommand.UNSUBSCRIBE.equals(command)) {
publishEvent(this.eventPublisher, new SessionUnsubscribeEvent(this, message, user));
}
}
}
} finally {
SimpAttributesContextHolder.resetAttributes();
}
} catch (Throwable ex) {
// ...
handleError(session, ex, message); // reach here
}
}
}
// ...
}
registry.setPreserveReceiveOrder(true);
public class OrderedMessageChannelDecorator implements MessageChannel {
// ...
private void sendNextMessage() {
for (;;) {
Message<?> message = this.messages.peek();
if (message != null) {
try {
setTaskHeader(message, new PostHandleTask(message));
if (this.channel.send(message)) {
return;
}
} catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex); // swallow exception
}
}
removeMessage(message);
} else {
this.sendInProgress.set(false);
trySend();
break;
}
}
}
// ...
}