I have a case where a WebFlux application needs to perform some blocking work. Therefore, I use .subscribeOn(aBoundedElasticScheduler)
in the stream.
When doing so, I've noticed that sometimes during heavy load, inbound requests are not fully read. After doing a lot of debugging, I believe I've narrowed it down to AbstractListenerReadPublisher
publishing an onComplete
signal before an onNext
signal in some cases, leading to the onNext
signal being dropped.
- spring-boot 2.4.5
- spring 5.3.6
- tomcat 9.0.45
Example logging for a request that is not fully read, since AbstractListenerReadPublisher
sends the onComplete
signal before the onNext
signal.
00:25:52,141 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] UNSUBSCRIBED -> SUBSCRIBING
00:25:52,141 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] SUBSCRIBING -> NO_DEMAND
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2b683da6] request 1
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] NO_DEMAND -> DEMAND
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2b683da6] onDataAvailable
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] DEMAND -> READING
// onDataAvailable callback comes in from tomcat
00:25:52,141 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2b683da6] onDataAvailable
// meanwhile, reading is still being performed on a boundedElastic thread
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.logBytesRead:263] - [2b683da6] Read 938 bytes
// onAllDataRead callback comes in from tomcat
00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2b683da6] onAllDataRead [READING]
00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] READING -> COMPLETED
// AbstractListenerReadPublisher.State.onAllDataRead sends onComplete signal
00:25:52,143 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.readAndPublish:198] - [2b683da6] Publishing DefaultDataBuffer
// AbstractListenerReadPublisher.readAndPublish sends onNext signal.
// This signal is eventually dropped, since it occurs after the onComplete sent above
For comparison, here is an example of a working request:
00:25:52,135 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2af765a0] onDataAvailable
00:25:52,137 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] UNSUBSCRIBED -> SUBSCRIBING
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] SUBSCRIBING -> NO_DEMAND
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2af765a0] request 1
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] NO_DEMAND -> DEMAND
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2af765a0] onDataAvailable
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] DEMAND -> READING
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.logBytesRead:263] - [2af765a0] Read 938 bytes
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.readAndPublish:198] - [2af765a0] Publishing DefaultDataBuffer
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] READING -> NO_DEMAND
00:25:52,140 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2af765a0] request 1
00:25:52,140 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] NO_DEMAND -> DEMAND
00:25:52,141 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2af765a0] onAllDataRead [DEMAND]
00:25:52,141 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] DEMAND -> COMPLETED
00:25:52,203 [tomcat-2] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2af765a0] onAllDataRead [COMPLETED]
This is a pretty complex app, and I haven't tried to isolate it down into a small reproducible app (yet).
@rstoyanchev, I know you've been in this code a lot recently for another issue I filed (#26407). Do you have any ideas off of the top of your head before I dive in deeper?
Comment From: rstoyanchev
@philsttr, from the logs it seems like after the switch to a non-container thread, reading begins right away and gets intertwined with perhaps the first onDataAvailable
call from Tomcat. This call should be ignored because we are in the READING
state given:
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] DEMAND -> READING
We're still in the READING
state when onAllDataRead
comes from Tomcat:
00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2b683da6] onAllDataRead [READING]
Meanwhile we have just called isReady
, performed the read, and are about to publish the data downstream, after which we'll do the same as long as there is demand from the downstream.
It seems odd but maybe the key here is that we are reading from a non-container thread and perhaps Tomcat has no way of knowing when we'll try to read again and therefore notifies us immediately on a container thread.
@markt-asf can you confirm if this is expected behavior? If so then we'll need to fix this on our end to anticipate that an onAllDataRead can come in the READING
state.
Comment From: markt-asf
Tomcat calls onAllDataRead()
when all of the following are true:
- onAllDataRead()
has not been call previously for this request
- it is known that a) there is no request body or b) all of the request body has been read by the application
Tomcat checks the above criteria at the following points:
- after the Servlet.service() method exits with request processing in async mode (i.e. ServletRequest.isAsyncStarted() == true
)
- after a call to onDataAvailable()
returns
- after a call to onWritePossible()
returns
Comment From: rstoyanchev
Thanks for clarifying. So maybe what happens is:
- Tomcat calls
onDataAvailable
before there is downstream demand, i.e. we're not yet ready to read - concurrently, demand arrives and we call
isReady
and read the data from a non-container thread - by the time
onDataAvailable
returns, data has been read and Tomcat callsonAllDataRead
I suppose the assumption is that during onDataAvailable
the data was read and onAllDataRead
come next. However in this case the data was read in parallel from a non-container thread. Is this expected? In effect it means we should expect onAllDataRead
to come while we are in a reading loop.
Comment From: markt-asf
If the data is being read in a non-container thread then it is certainly possible that the non-container thread will read all of the data thereby triggering onAllDataRead()
which could fire on a container thread before the the non-container thread has finished doing whatever it is that it needs to do with the data it has just read.
Comment From: rstoyanchev
Okay thanks for confirming. I'll work on a fix.
Comment From: philsttr
Thanks guys! I'll be happy to review and test the fix when it is available.
Comment From: rstoyanchev
@philsttr, there is a fix now in the 5.3.7-SNAPSHOT. You can give that a try.
Comment From: philsttr
Tested and confirmed working. Thanks for the quick turnaround! You rock!
Comment From: rstoyanchev
Great! Thanks for confirming ahead of the release.