I have a case where a WebFlux application needs to perform some blocking work. Therefore, I use .subscribeOn(aBoundedElasticScheduler) in the stream.

When doing so, I've noticed that sometimes during heavy load, inbound requests are not fully read. After doing a lot of debugging, I believe I've narrowed it down to AbstractListenerReadPublisher publishing an onComplete signal before an onNext signal in some cases, leading to the onNext signal being dropped.

  • spring-boot 2.4.5
  • spring 5.3.6
  • tomcat 9.0.45

Example logging for a request that is not fully read, since AbstractListenerReadPublisher sends the onComplete signal before the onNext signal.

00:25:52,141 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] UNSUBSCRIBED -> SUBSCRIBING
00:25:52,141 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] SUBSCRIBING -> NO_DEMAND
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2b683da6] request 1
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] NO_DEMAND -> DEMAND
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2b683da6] onDataAvailable
00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] DEMAND -> READING

// onDataAvailable callback comes in from tomcat

00:25:52,141 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2b683da6] onDataAvailable

// meanwhile, reading is still being performed on a boundedElastic thread

00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.logBytesRead:263] - [2b683da6] Read 938 bytes

// onAllDataRead callback comes in from tomcat

00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2b683da6] onAllDataRead [READING]
00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] READING -> COMPLETED

// AbstractListenerReadPublisher.State.onAllDataRead sends onComplete signal

00:25:52,143 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.readAndPublish:198] - [2b683da6] Publishing DefaultDataBuffer

// AbstractListenerReadPublisher.readAndPublish sends onNext signal.
// This signal is eventually dropped, since it occurs after the onComplete sent above

For comparison, here is an example of a working request:

00:25:52,135 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2af765a0] onDataAvailable
00:25:52,137 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] UNSUBSCRIBED -> SUBSCRIBING
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] SUBSCRIBING -> NO_DEMAND
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2af765a0] request 1
00:25:52,138 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] NO_DEMAND -> DEMAND
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.onDataAvailable:117] - [2af765a0] onDataAvailable
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] DEMAND -> READING
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.logBytesRead:263] - [2af765a0] Read 938 bytes
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.readAndPublish:198] - [2af765a0] Publishing DefaultDataBuffer
00:25:52,139 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] READING -> NO_DEMAND
00:25:52,140 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.request:262] - [2af765a0] request 1
00:25:52,140 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] NO_DEMAND -> DEMAND
00:25:52,141 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2af765a0] onAllDataRead [DEMAND]
00:25:52,141 [sync-193] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2af765a0] DEMAND -> COMPLETED
00:25:52,203 [tomcat-2] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2af765a0] onAllDataRead [COMPLETED]

This is a pretty complex app, and I haven't tried to isolate it down into a small reproducible app (yet).

@rstoyanchev, I know you've been in this code a lot recently for another issue I filed (#26407). Do you have any ideas off of the top of your head before I dive in deeper?

Comment From: rstoyanchev

@philsttr, from the logs it seems like after the switch to a non-container thread, reading begins right away and gets intertwined with perhaps the first onDataAvailable call from Tomcat. This call should be ignored because we are in the READING state given:

00:25:52,142 [sync-185] [_.s.h.s.r.AbstractListenerReadPublisher.changeState:215] - [2b683da6] DEMAND -> READING

We're still in the READING state when onAllDataRead comes from Tomcat:

00:25:52,143 [tomcat-1] [_.s.h.s.r.AbstractListenerReadPublisher.onAllDataRead:128] - [2b683da6] onAllDataRead [READING]

Meanwhile we have just called isReady, performed the read, and are about to publish the data downstream, after which we'll do the same as long as there is demand from the downstream.

It seems odd but maybe the key here is that we are reading from a non-container thread and perhaps Tomcat has no way of knowing when we'll try to read again and therefore notifies us immediately on a container thread.

@markt-asf can you confirm if this is expected behavior? If so then we'll need to fix this on our end to anticipate that an onAllDataRead can come in the READING state.

Comment From: markt-asf

Tomcat calls onAllDataRead() when all of the following are true: - onAllDataRead() has not been call previously for this request - it is known that a) there is no request body or b) all of the request body has been read by the application

Tomcat checks the above criteria at the following points: - after the Servlet.service() method exits with request processing in async mode (i.e. ServletRequest.isAsyncStarted() == true) - after a call to onDataAvailable() returns - after a call to onWritePossible() returns

Comment From: rstoyanchev

Thanks for clarifying. So maybe what happens is:

  • Tomcat calls onDataAvailable before there is downstream demand, i.e. we're not yet ready to read
  • concurrently, demand arrives and we call isReady and read the data from a non-container thread
  • by the time onDataAvailable returns, data has been read and Tomcat calls onAllDataRead

I suppose the assumption is that during onDataAvailable the data was read and onAllDataRead come next. However in this case the data was read in parallel from a non-container thread. Is this expected? In effect it means we should expect onAllDataRead to come while we are in a reading loop.

Comment From: markt-asf

If the data is being read in a non-container thread then it is certainly possible that the non-container thread will read all of the data thereby triggering onAllDataRead() which could fire on a container thread before the the non-container thread has finished doing whatever it is that it needs to do with the data it has just read.

Comment From: rstoyanchev

Okay thanks for confirming. I'll work on a fix.

Comment From: philsttr

Thanks guys! I'll be happy to review and test the fix when it is available.

Comment From: rstoyanchev

@philsttr, there is a fix now in the 5.3.7-SNAPSHOT. You can give that a try.

Comment From: philsttr

Tested and confirmed working. Thanks for the quick turnaround! You rock!

Comment From: rstoyanchev

Great! Thanks for confirming ahead of the release.