Affects: spring-web 6.1.2
Context
I am using spring-boot-starter-undertow and WebFlux.
Description
When I have a WebFlux controller @RequestMapping
handler method that returns a Publisher<T>
of at least 2 elements for which publishOn
is applied to move processing to a different Scheduler other than Undertow's XNIO threads, I am observing a race condition in ChannelSendOperator.WriteBarrier#request(long n) between my thread and the XNIO thread that is processing onWritePossible
from the channel selector. This sounds similar to the closed issue described previously in https://github.com/spring-projects/spring-framework/issues/21098.
I can produce this race condition simply by setting a thread-suspending breakpoint on L292, and running e.g. this quick-and-dirty test case that I threw together (continue from the breakpoint after the XNIO thread parks on the object monitor indicated).
The race happens when
1. my boundedElastic-1 thread has passed emitCachedSignals
, writing the first element to the client and allow the selector to propagate the "WritePossible" event on the XNIO thread, but has not passed this.state = State.READY_TO_WRITE
.
2. the XNIO thread has passed the State.READY_TO_WRITE
check, where it would normally had requested more.
In this race condition, s.request(n)
is no longer called, meaning that the response is never finished sending.
@Override
public void request(long n) {
Subscription s = this.subscription;
if (s == null) {
return;
}
if (this.state == State.READY_TO_WRITE) {
s.request(n);
return;
}
synchronized (this) { // (2) "XNIO-1 I/O-#" thread is parked here, past the READY_TO_WRITE check.
if (this.writeSubscriber != null) {
if (this.state == State.EMITTING_CACHED_SIGNALS) {
this.demandBeforeReadyToWrite = n;
return;
}
try {
this.state = State.EMITTING_CACHED_SIGNALS;
if (emitCachedSignals()) {
return;
}
n = n + this.demandBeforeReadyToWrite - 1;
if (n == 0) {
return; // Both threads will reach this point and return without requesting more elements.
}
}
finally {
this.state = State.READY_TO_WRITE; // (1) "boundedElastic-1" has not finished this statement
}
}
}
s.request(n);
}
Thread Dump
The two relevant threads:
"boundedElastic-1@6727" daemon prio=5 tid=0x32 nid=NA runnable
java.lang.Thread.State: RUNNABLE
blocks XNIO-1 I/O-13@6083
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:292)
- locked <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$1.onSubscribe(AbstractListenerWriteProcessor.java:361)
at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onSubscribe(AbstractListenerWriteProcessor.java:111)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358)
at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor$State$2.onNext(AbstractListenerWriteFlushProcessor.java:293)
at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:120)
at org.springframework.http.server.reactive.AbstractListenerWriteFlushProcessor.onNext(AbstractListenerWriteFlushProcessor.java:43)
at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse.lambda$writeAndFlushWithInternal$0(AbstractListenerServerHttpResponse.java:64)
at org.springframework.http.server.reactive.AbstractListenerServerHttpResponse$$Lambda$926/0x00000001005fdd18.subscribe(Unknown Source:-1)
at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runSync(FluxPublishOn.java:366)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:524)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:840)
"XNIO-1 I/O-13@6083" prio=5 tid=0x2d nid=NA waiting for monitor entry
java.lang.Thread.State: BLOCKED
waiting for boundedElastic-1@6727 to release lock on <0x1aa4> (a org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:276)
at org.springframework.http.server.reactive.AbstractListenerWriteProcessor$State$3.onWritePossible(AbstractListenerWriteProcessor.java:415)
at org.springframework.http.server.reactive.AbstractListenerWriteProcessor.onWritePossible(AbstractListenerWriteProcessor.java:158)
at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor.lambda$new$0(UndertowServerHttpResponse.java:179)
at org.springframework.http.server.reactive.UndertowServerHttpResponse$ResponseBodyProcessor$$Lambda$929/0x0000000100602390.handleEvent(Unknown Source:-1)
at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:285)
at io.undertow.channels.DetachableStreamSinkChannel$SetterDelegatingListener.handleEvent(DetachableStreamSinkChannel.java:272)
at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
at org.xnio.conduits.WriteReadyHandler$ChannelListenerHandler.writeReady(WriteReadyHandler.java:65)
at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:94)
at org.xnio.nio.WorkerThread.run(WorkerThread.java:591)
Speculation
On the other methods of WriteBarrier
that happen to synchronize on this
(onNext
, onError
, onComplete
), I noticed that there is double-checked locking in play for this.state == State.READY_TO_WRITE
. Would it be correct to add this for the request
method, such that we make a request upstream when we encounter this race condition?
Comment From: jhoeller
@rstoyanchev reviewing the implementation of the related methods, it seems sensible to add double-checked locking here as well, simply duplicating the READY_TO_WRITE
/request(n)
check at the beginning of the synchronized block, analogous to the other methods. Do you see any disadvantage there? Also, I suppose this is worth backporting to 6.0.x and 5.3.x as well.
Comment From: rstoyanchev
Thanks for the detailed analysis. It makes sense to have a similar READY_TO_WRITE
check inside the synchronized block in request
as well in case the server is quick in coming back to request more from a separate thread.