Affects: spring-web 6.1.2


Context

I am using spring-boot-starter-undertow and WebFlux.

Description

When I have a WebFlux controller @RequestMapping handler method that returns a Publisher<T> of at least 2 elements for which publishOn is applied to move processing to a different Scheduler other than Undertow's XNIO threads, I am observing a race condition in ChannelSendOperator.WriteBarrier#request(long n) between my thread and the XNIO thread that is processing onWritePossible from the channel selector. This sounds similar to the closed issue described previously in https://github.com/spring-projects/spring-framework/issues/21098.

I can produce this race condition simply by setting a thread-suspending breakpoint on L292, and running e.g. this quick-and-dirty test case that I threw together (continue from the breakpoint after the XNIO thread parks on the object monitor indicated).

The race happens when 1. my boundedElastic-1 thread has passed emitCachedSignals, writing the first element to the client and allow the selector to propagate the "WritePossible" event on the XNIO thread, but has not passed this.state = State.READY_TO_WRITE. 2. the XNIO thread has passed the State.READY_TO_WRITE check, where it would normally had requested more.

In this race condition, s.request(n) is no longer called, meaning that the response is never finished sending.

@Override
public void request(long n) {
    Subscription s = this.subscription;
    if (s == null) {
        return;
    }
    if (this.state == State.READY_TO_WRITE) {
        s.request(n);
        return;
    }
    synchronized (this) {                                        // (2) "XNIO-1 I/O-#" thread is parked here, past the READY_TO_WRITE check.
        if (this.writeSubscriber != null) {
            if (this.state == State.EMITTING_CACHED_SIGNALS) {
                this.demandBeforeReadyToWrite = n;
                return;
            }
            try {
                this.state = State.EMITTING_CACHED_SIGNALS;
                if (emitCachedSignals()) {
                    return;
                }
                n = n + this.demandBeforeReadyToWrite - 1;
                if (n == 0) {
                    return;                     // Both threads will reach this point and return without requesting more elements.
                }
            }
            finally {
                this.state = State.READY_TO_WRITE;  // (1) "boundedElastic-1" has not finished this statement
            }
        }
    }
    s.request(n);
}

Thread Dump

The two relevant threads:

"boundedElastic-1@6727" daemon prio=5 tid=0x32 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
     blocks XNIO-1 I/O-13@6083
      at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:292)
      - locked <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
      at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$1.onSubscribe(AbstractListenerWriteProcessor.java:361)
      at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onSubscribe(AbstractListenerWriteProcessor.java:111)
      at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358)
      at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor$State$2.onNext(AbstractListenerWriteFlushProcessor.java:293)
      at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:120)
      at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:43)
      at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
      at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
      at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
      at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
      at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse.lambda$writeAndFlushWithInternal$0(AbstractListenerServerHttpResponse.java:64)
      at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse$$Lambda$926/0x00000001005fdd18.subscribe(Unknown Source:-1)
      at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
      at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
      at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
      at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
      at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
      at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
      at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
      at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runSync(FluxPublishOn.java:366)
      at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:524)
      at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
      at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
      at java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.lang.Thread.run(Thread.java:840)


"XNIO-1 I/O-13@6083" prio=5 tid=0x2d nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
     waiting for boundedElastic-1@6727 to release lock on <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
      at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:276)
      at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$3.onWritePossible(AbstractListenerWriteProcessor.java:415)
      at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onWritePossible(AbstractListenerWriteProcessor.java:158)
      at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor.lambda$new$0(UndertowServerHttpResponse.java:179)
      at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor$$Lambda$929/0x0000000100602390.handleEvent(Unknown Source:-1)
      at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
      at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:285)
      at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:272)
      at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
      at org.xnio.conduits.WriteReadyHandler$ChannelListenerHandler.writeReady(WriteReadyHandler.java:65)
      at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:94)
      at org.xnio.nio.WorkerThread.run(WorkerThread.java:591)

Speculation

On the other methods of WriteBarrier that happen to synchronize on this (onNext, onError, onComplete), I noticed that there is double-checked locking in play for this.state == State.READY_TO_WRITE. Would it be correct to add this for the request method, such that we make a request upstream when we encounter this race condition?

Comment From: jhoeller

@rstoyanchev reviewing the implementation of the related methods, it seems sensible to add double-checked locking here as well, simply duplicating the READY_TO_WRITE/request(n) check at the beginning of the synchronized block, analogous to the other methods. Do you see any disadvantage there? Also, I suppose this is worth backporting to 6.0.x and 5.3.x as well.

Comment From: rstoyanchev

Thanks for the detailed analysis. It makes sense to have a similar READY_TO_WRITE check inside the synchronized block in request as well in case the server is quick in coming back to request more from a separate thread.