Alex opened SPR-12048 and commented
The issue is described on StackOverflow (http://stackoverflow.com/questions/24991275/rpc-over-stomp-using-spring-and-correctly-handling-server-side-errors-propagate) but since it seems a general problem I hope it's ok to open a ticket for it.
While using @MessageMapping
is fine for normal messaging, I find using @SendToUser
quite limitating for implementing RPC because the client has an hard time to understand which reply is associated with which request in a scenario when multiple simultaneous requests are being made from the client.
Of course there is no problem when just only one request is made, and the client waits for its reply, but problems arise when the client has to keep track of multiple "open" rpc calls.
I've managed to make the system mostly fine by associating an ID with every request, i.e.: the client sends an id together with the message, and the server replies with a special message wrapper that contains this id, so the client is able to associate asynchronous replies with requests.
This works fine but has several limitations:
-
I have to develop code that needs to understand this structure, and that defies the uitlity to have simple annotated methods
-
when the server side code generates an Exception the Spring
@MessageExceptionHandler
get called and the correct Exception is returned to the client, but the request id is lost because the handler has no (easy) way to access it.
I know that with rabbitmq we can add "reply-to" header to every request that needs to be associated with a special reply (the rpc response), and this is implemented by creating a special temporary queue that the user is automatically subscribed to, but how may I use this scheme in Spring? Also, that would tie me a specific broker.
How may I elegantly implement a correct RPC call in Spring that correctly handles server side exceptions?
I find this a general problem and I think Spring could benefit greatly to implement it natively.
Affects: 4.0.6
1 votes, 7 watchers
Comment From: spring-projects-issues
Rossen Stoyanchev commented
My first reaction is that this is the role of destinations. On the client-side you describe to as many destinations as you need to and that helps to differentiate which message is for what. Perhaps a sketch of a simple concrete example would help to understand what you mean.
Comment From: spring-projects-issues
Alex commented
The problem is efficiency: I don't want to make two roundtrips for every RPC call (one to subscribe to a unique queue for every rpc call, and the other being the rpc call), being efficiency in the first place the cause I choose a websocket protocol in place of usual HTTP/REST.
Anyway, even by setting a custom queue for every rpc call the problem persist because it is in the exception handling: by design the exceptions get sent to a well defined queue: the same for every rpc call (the one defined by @MessageExceptionHandler
), so the client is not able to understand which of multiple active call has generated the exception.
My advice would be to let the caller put an ID on the header of the request of the rpc call, and Spring deliver this ID in the reply posted on the out queue (also in the exception handling) : this would allow the caller to associate correctly the replies with the requests.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
I'm sure you have a good point in there but without a concrete example I'm left guessing and I can't respond adequately. Could you still provide a basic outline to give me a more concrete idea of what you are doing? No need for working code, just some example @MessageMapping
and @SubscribeMapping
methods and example destinations.
Comment From: spring-projects-issues
Alex commented
As a proof of concept I've modified the SendToMethodReturnValueHandler class, and this is how my code works now.
On the client: * Subscribe to queue /user/queue/replies with a handler called handleRpcReplies * Subscribe to queue /user/queue/errors with a handler called handleRpcErrors * The client keeps tracks of the number of RPC calls he made from the start, name this "counter" * Each time the client has to to do a RPC call for "op", it does the following * prepare the message with the op name and parameters * adds to the header the value "myId=counter" * Add to a local map used to track active calls the counter and associated info of the caller (for example a deferred object obtained with $q using angularjs) * Send the STOMP message to the appropriate destination (depending on the op) * increment the counter * Since the calls are asynchronous and the server may need some time to execute them, it's possible that the client has multiple active calls waiting for replies, and that's why we use a map to track all the calls we are waiting replies from * When a message arrives to the "/user/queue/replies" the handleRpcReplies gets called, and the first thing it does is extract the "myId" value from the headers. It uses that value to access the map of active calls, and pass the result to the deferred object, thus completing the call (it also remove the element from the map * When a message arrives to the errorQueue the handleRpcError gets called and does basically the same thing, with the difference that it calls the error method of the deferred object to notify the caller that there was an exception
The code is really simple and elegant if implemented using the deferred object of angularjs. I mean, in the case the "myId" is correctly handled on the response message by the server. I can provide sample code if you need the gory details.
To make this work, on the server I have a pretty standard class:
@Controller
public class MessagingController {
@MessageMapping("/rpc/op1")
@SendToUser("/queue/replies")
public Object op1(MyObject input) {
...
return object;
}
@MessageMapping("/rpc/op2")
@SendToUser("/queue/replies")
public Object op2(MyObject input) {
...
return object;
}
@MessageExceptionHandler
@SendToUser("/queue/errors")
public Object handleException(MethodArgumentNotValidException e) {
...
return object;
}
}
Comment From: spring-projects-issues
Alex commented
That's how I modified the Spring class:
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java b/spring-messaging/src/main/
java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java
index dd0b521..15f2413 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java
@@ -140,7 +140,8 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
-
+ String myId = SimpMessageHeaderAccessor.wrap(message).getFirstNativeHeader("myId");
+
SendToUser sendToUser = returnType.getMethodAnnotation(SendToUser.class);
if (sendToUser != null) {
boolean broadcast = sendToUser.broadcast();
@@ -155,10 +156,10 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
String[] destinations = getTargetDestinations(sendToUser, message, this.defaultUserDestinationPrefix);
for (String destination : destinations) {
if (broadcast) {
- this.messagingTemplate.convertAndSendToUser(user, destination, returnValue);
+ this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, createHeaders(null, myId));
}
else {
- this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, createHeaders(sessionId));
+ this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, createHeaders(sessionId, myId));
}
}
}
@@ -166,7 +167,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
SendTo sendTo = returnType.getMethodAnnotation(SendTo.class);
String[] destinations = getTargetDestinations(sendTo, message, this.defaultDestinationPrefix);
for (String destination : destinations) {
- this.messagingTemplate.convertAndSend(destination, returnValue, createHeaders(sessionId));
+ this.messagingTemplate.convertAndSend(destination, returnValue, createHeaders(sessionId, myId));
}
}
}
@@ -195,13 +196,16 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
new String[] {defaultPrefix + destination} : new String[] {defaultPrefix + "/" + destination});
}
- private MessageHeaders createHeaders(String sessionId) {
+ private MessageHeaders createHeaders(String sessionId, String replyId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
- headerAccessor.setSessionId(sessionId);
+ if (sessionId != null)
+ headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
+ if (replyId != null)
+ headerAccessor.addNativeHeader("myId", replyId);
return headerAccessor.getMessageHeaders();
}
Of course that modification is not proposable because the header I pass from input to output message is hardcoded and that should be generalized, but I don't know the best way to do it, otherwise I would propose a pull request.
Feel free to ask any detail if anything is not clear!
Comment From: spring-projects-issues
Alex commented
I've tried to generalize the copying of headers by modifying the @SendToUser
annotation.
My proposal is now available as a pull request here: https://github.com/spring-projects/spring-framework/pull/612
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Thanks for the extra details, I now have a pretty good idea! Essentially you're sending many "request" messages to a given destination and want a way to correlate the "reply" messages from the client side be it success or failure.
Although a "copyHeaders" attribute will work, technically @SendToUser
(and @SendTo
) by themselves don't imply strong a request-reply interaction. In this case it happens to be but in the future they could be supported in other scenarios (e.g. client-side interface for sending messages only, not replying).
I think what we can do instead is add support for a correlation id in SendToMethodReturnValueHandler. If the incoming message has it, we'll add it to the sent message.
Comment From: spring-projects-issues
Alex commented
Hello Rossen, and thanks for your reply!
Yes, the main problem is to correlate failures relative to some calls, but also to simplify correlation between RPC calls and their results: for example the client can issue two RPC calls for the same destination and receive the two results in reverse order. The id in the headers will be used by the client to forward the correct reply to the right caller function.
The proposed code don't imply a strong correlation between request-reply, and copies the defined headers only if presents, so it's completely backward compatible.
The code is not perfect though and could be ameliorated, in the sense that an empty header structure is created even in the (unlikely?) event that the message has no user, no sessionId, and no headers to copy.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
An @SendToUser
with an attribute such as copyHeaders implies that headers have to be copied from a message being processed. Perhaps not request-reply strictly speaking, but we can't make the assumption that @SendToUser
is on a method that's processing a message. So this has nothing to do with how you implemented it. It's purely about the design of the annotation.
Comment From: spring-projects-issues
Rossen Stoyanchev commented
Modified title (was: "Better handling of exceptions during RPC calls")
Comment From: spring-projects-issues
Alex commented
I know that Spring 4.1 is preparing its release but if there is a way to have this little patch included with some work done please advise, I'll be very happy to try to help.
This simple modification cuts almost a third in the code I've to write :)
Comment From: spring-projects-issues
Ben Kiefer commented
Any hope of seeing an extension point that allows us to get access to the inbound request headers and applying them to the outbound message? The MessageHeaderInitializer does not have access to inbound request headers, so we don't have the option to do anything at that extension point.
SendToMethodReturnValueHandler and SubscriptionMethodReturnValueHandler would both need to be updated to allow for this.
Comment From: rstoyanchev
One option for this is to inject the correlation header and use SimpMessagingTemplate
to send the message with the correlation id set. Another is to make the destination unique which can be mapped with a pattern /rpc/op1/{id}
.
Comment From: rstoyanchev
Closing for now, but can be re-opened if needed.