Affects: webmvc 6.x
We have a new application that uses the SseEmitter to send events back to clients as they arrive in our system via a Kafka Consumer. Recently this application started crashing every few days with Heap OOM errors. After some analysis, it looks like what is happening is that the connection gets terminated, but the .onError
handler is never called. As a result we treat the connection as still available and keep publishing events to it.
As part of the error, it looks like the underlying handler
object is set to null. As a result, the message is queued in the earlySendAttempts
LinkedHashSet. It continues to accumulate messages there and accumulate until a Heap OOM occurs.
Someone else reported the issue on Stackoverflow here.
This looks like it was introduced in the following commit from @bclozel.
Reproducibility is tough but I will continue to work for it
Comment From: randeepbydesign
Here is the code in question from ResponseBodyEmitter:
private void sendInternal(Set<DataWithMediaType> items) throws IOException {
if (items.isEmpty()) {
return;
}
if (this.handler != null) {
try {
this.handler.send(items);
}
catch (IOException ex) {
this.ioErrorOnSend = true;
throw ex;
}
catch (Throwable ex) {
throw new IllegalStateException("Failed to send " + items, ex);
}
}
else {
// THIS is where the memory leak occurs
this.earlySendAttempts.addAll(items);
}
}
Comment From: simonbasle
Thanks for the report @randeepbydesign. It looks like you are still investigating, but this caught my eye:
As part of the error, it looks like the underlying handler object is set to null
It would be great if could you clarify how the this.handler
is set to null
after having been initialised?
edit: please also tell us the latest exact version that you used in which you encounter the issue 🙏
Comment From: rstoyanchev
In addition to the version information which is crucial, have you registered an onError
callback on SseEmitter
? The
SO report has a stack trace with a ClientAbortException
. Do you have this too? Either way it sounds like an issue with early initialization, but you should be able to detect this via onError
.
The Spring Framework version in the SO report is very old 5.0.7, and fails here while flushing when the client had gone away, and the handler initialization, a few lines below, is never reached. This was improved in db3d537e72ed07828efc456d112523ae0377d84b for Spring Framework 5.2.10. Now SseEmitter
should notify onError
if initialization fails early.
Comment From: randeepbydesign
Hi, thank you for the reply and my apology for not being clear with these details at the outset. A common theme in our error logs is a Broken pipe exception when attempting to send an SSE event:
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
...
at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:128)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
at com.myservice.client.notifications.service.NotificationSubscriptionService.sendToRecipient(NotificationSubscriptionService.java:107)
at ...
We do have handlers defined for onError
and onTimeout
, however, we don't have logging specific to these being invoked. I will add that now and send it out into the field.
Comment From: randeepbydesign
Just to follow up, our .onError
and .onTimeout
handlers remove an SSE recipient from an internal collection we used to coordinate sending messages. Because of the nature of the memory leak I don't think they are being invoked but will double check.
Comment From: rstoyanchev
You can see the handling here. It either invokes initializeWithError
(line 237) or initialize (line 241). There is no ServletOutputStream I/O before that so I don't see much of an opportunity for return value handling to fail earlier with neither being called. You can check with an @ExceptionHandler
as well if any exception occurs earlier that we are perhaps not considering.
Comment From: randeepbydesign
I think I may have started this with a red herring. As part of the initial investigation I reported that:
As part of the error, it looks like the underlying handler object is set to null
After looking at some added logs it looks like we are handling these errors properly on a send operation:
- IO Exception error is generated, probably trying to send to a client that is disconnected
.onCompletion()
handler is called.onError()
handler is called
The client is properly deregistered at that point.
We are now pivoting towards a look at the ResponseBodyEmitter
and using reflection to see if the org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.Handler
bean is never registered in the first place for some emitters. If this theory were true producer would always be null and the Set of messages would continue to grow.
We are setting up some reflection interceptors to monitor this in the cloud. One active question we have that could help if you know the answer: how is handler populated and how might it go wrong? Looking at the code it happens in a method: synchronized void initialize(Handler handler) throws IOException
. ResponseBodyEmitterReturnValueHandler
has error handling for specifying this and the logic there seems sound. But I am not sure if there could be a bug at that point..
Comment From: spring-projects-issues
If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.
Comment From: randeepbydesign
My apologies: I realized I never answered some specific questions asked.
I am using spring-webmvc 6.1.4
via spring boot 3.2.3
Regarding the question about uncaught exceptions, there are several that occur during the course of a day. They are all Broken Pipe IOExceptions and they originate from attempts to send messages to recipients that have disconnected.
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:102)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:58)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:542)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:118)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1381)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:764)
at org.apache.tomcat.util.net.SocketWrapperBase.flushBlocking(SocketWrapperBase.java:728)
at org.apache.tomcat.util.net.SocketWrapperBase.flush(SocketWrapperBase.java:712)
...
at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:128)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
I have attached an image below that shows a snapshot of the heap right before the service OOMs and at the moment a fresh server starts.
Comment From: rstoyanchev
One active question we have that could help if you know the answer: how is handler populated and how might it go wrong? Looking at the code it happens in a method: synchronized void initialize(Handler handler) throws IOException. ResponseBodyEmitterReturnValueHandler has error handling for specifying this and the logic there seems sound. But I am not sure if there could be a bug at that point..
It's either initialize
or initializeWithError
that gets called. I referenced the code that does this in https://github.com/spring-projects/spring-framework/issues/33340#issuecomment-2284904798. If an exception occurs before that code, then yes the emitter would remain not initialized, but I don't see much of a possibility for that, and as I mentioned before, an @ExceptionHandler
would detect any such unexpected exception.
Is it possible that there is a large number of messages being sent immediately during that brief period when the emitter is buffering messages before it has been initialized? If there are enough requests in parallel, then the buffering for each could add up to a point that where the process to runs out of memory.
At the moment we don't have anything on ResponseBodyEmitter
to notify when it is initialized and ready to send (so it won't have to buffer). You can try to introduce a delay before the emitter is connected to Kafka in order to allow the emitter to be initialized first, and avoid the need to buffer early messages. If this makes a difference, we can introduce some such callback that will notify you exactly when the emitter is initialized and ready to send.
Comment From: randeepbydesign
I think the idea of a burst of messages being sent immediately is unlikely as the service runs for several days, heap size grows during that time, and it eventually OOMs. We have some more tooling in place to collect data and I'll be sure to report back here.
Comment From: rstoyanchev
The logic for initializing SseEmitter
is pretty tight, and I don't see much possibility for it to remain not initialized, and if it did, you would see an Exception that happens somewhere before the code I referenced above.
This is why I'm raising the possibility that the issue could be a result of a natural accumulation of early messages, and that could peak at any point, even after several days.
Comment From: randeepbydesign
We isolated the root cause. There may be some blame on both sides but... it's probably mostly on my side :0)
The issue occurs because we create an SSEEmitter
, store it in a Set, and then do some validations on the request to open the SSE connection.
When the validations fail, the SSEEmitter remains in our Set and get requests to send messages back to a recipient that already got a 400 response.
"Ok cool," you might say, "but when the 400 was generated shouldn't the ResponseBodyEmitterReturnValueHandler
caught the exception and called completeWithError
on the SSEEmitter? Well, our generic Exception Handling controller advice returns the response as a JSON object. So the handler is not the SSE one, but rather the HttpEntityMethodProcessor
Because of this, this.complete
remains false, handler is null, and we arrive at the state we are in where messages accumulate.
Comment From: bclozel
It sounds a lot like a lifecycle issue in your application controller, then.
There may be some blame on both sides
Is there anything we can do to prevent this situation?
Comment From: rstoyanchev
If I understand correctly, the controller method never returns? The SseEmitter
is added to a Set
, then validation is performed but fails, and the controller method raises an exception. In that case, we don't ever see the SseEmitter
and have no chance to initialize it either way.
If this is not correct, feel free to clarify. However, either way it sounds like you need to perform validations first before returning the SseEmitter or caching it.
Comment From: randeepbydesign
@rstoyanchev, I think your assessment is mostly correct. The controller method returns, but returns a json object instead of the SseEmitter. It makes sense that, if the SseEmitter is never returned that it cannot be intercepted with the current pattern.
And it was clearly a mistake on our part.
However, I have a feeling that the nature of SSE means that this could occur for other people. By "nature" I simply mean that to work with SSE, applications will need to keep these references in memory. Once the object is created, one needs to consider exception handling at all points in the workflow after the SSE is created and stored.
Failing noisily, such as in a memory leak! might be the best way to get eyes on this. Perhaps:
- If there was a spring-managed Sse creation service that knew about registered entities
- If there was logic in ResponseBodyEmitter
to set a "reasonable" (I know I hate that word too) time threshold for when handler is null and messages are being published
Comment From: rstoyanchev
@randeepbydesign to this point I don't have a good understanding of your scenario. I am trying to picture your code, but I have no idea what "validations on the request to open the SSE connection" really is, when and where an exception is thrown, whether the controller method returns or not, what does it return, and many more questions that are left open. The description has come in bits and pieces without any substantive detail. I have understood enough to know that the exception occurs in your code, and therefore your code should be aware of the exception, but I really cannot engage any more more given this level of detail.