Versions: org.springframework.boot:spring-boot-starter:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-actuator:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-data-mongodb:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-data-jpa:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-log4j2:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-web:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-webflux:2.1.1.RELEASE org.springframework.boot:spring-boot-starter-test:2.1.1.RELEASE org.springframework.cloud:spring-cloud-starter-sleuth:2.1.1.RELEASE org.springframework.security:spring-security-test:5.1.2.RELEASE
So I am having a very strange issue where Mono.block() hangs indefinitely and it only seems to occur when spring-boot-start-actuator is included as a dependency and the service being called returns an error status. Not sure what the relation is or if I am doing something wrong.
Below is the code being used:
Mono<A> aMono = webClient.get().uri().retrieve().bodyToMono(A.class).cache()
aMono.subscribe()
aMono.block()
If if the service returns a successful response, the aMono.block() returns without issue. If spring-boot-starter-actuator:2.1.1.RELEASE dependency is removed, aMono.block() returns without issues for successful and errors responses. Below is where the thread hangs:
http-nio-8080-exec-1" #42 daemon prio=5 os_prio=0 tid=0x0000000035c5c800 nid=0x4bb4 waiting on condition [0x000000003e679000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076ec549d0> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:81) at reactor.core.publisher.Mono.block(Mono.java:1493 .. .. at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:791) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1417) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) - locked <0x000000075bd66538> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers: - <0x000000075769f570> (a java.util.concurrent.ThreadPoolExecutor$Worker)
Comment From: rstoyanchev
Is it intentional that this is calling both subscribe and block? Both subscribe, it's just one does it without blocking.
Comment From: bfrisoni
Yes it is intentional. The use case is that there is some synchronous validation that needs to be done by utilizing data in a 4-5 different external services. So these requests are all fired and subscribed to initiate the call asynchronously, once the data is needed down the pipe a aMono.block() is called to return this cached response. Let me know if this is clear. I am doing some additional troubleshooting, it seems when the subscription consumes the error this hanging does not occur. For example:
aMono.doOnError(throwable -> System.out.println(throwable))
Comment From: bfrisoni
Looks like my last comment stating that if the error is consumed the aMono.block() does not hang is incorrect. It still hangs. Although if I step into the aMono.block() when my debugger is enabled, the thread does not hang.
I am doing some additional troubleshooting, it seems when the subscription consumes the error this hanging does not occur. For example: aMono.doOnError(throwable -> System.out.println(throwable))
Comment From: rstoyanchev
The use case is that there is some synchronous validation that needs to be done by utilizing data in a 4-5 different external services. So these requests are all fired and subscribed to initiate the call asynchronously, once the data is needed down the pipe a aMono.block() is called to return this cached response.
The snippet shown is too basic perhaps, but I still don't understand the reason for caching + subscribing and blocking. If you need to call multiple services and validate when all have returned I would expect that to be composed into a single chain. I have too little information to work with but based on the given description it could be something like this:
Mono<A> aMono = webClient.get().uri().retrieve().bodyToMono(A.class);
Mono<B> bMono = webClient.get().uri().retrieve().bodyToMono(B.class);
Mono<C> cMono = webClient.get().uri().retrieve().bodyToMono(C.class);
Mono<D> dMono = webClient.get().uri().retrieve().bodyToMono(D.class);
Mono.zip(aMono, bMono, cMono, dMono)
.doOnNext((a,b,c,d) -> {
// ...
)}
.block();
Comment From: bfrisoni
Thanks @rstoyanchev for the reply. Let me expand a little bit on the current use case and the limitations with your suggestion. 1.Mon.zip(..) maxes out I believe at 8 parameters 2.With your current implementation, the application would completely block until all requests have responded which is what i was trying to avoid.
Instead, with my current solution I am able to 1.Fire all 5 (could be more than 5 as the application grows) requests and subscribe asynchronously 2.Begin computations on validating a bunch of other rules that need to be satisfied, which may also require synchronous RDBMS lookups for validation data locally owned by the application itself to validate certain rules. 3.While number #2 is computing, in the back round all my requests are being processed asynchronously and only once I get to the rules that actually require this web service external data does the application block, which at this point most responses have been received and cached allowing the Mono.block() to return immediately.
Again, the thread only hangs, if actuator module is enabled AND one of the responses returns an error. If i remove the actuator module OR all responses return without error, the mono.block() does not hang. I have yet to test if this occurs with a Flux.block() under these sames conditions.
Comment From: rstoyanchev
@bfrisoni
- Mon.zip(..) maxes out I believe at 8 parameters
It doesn't max out. Please check the variant that takes Iterable.
2.With your current implementation, the application would completely block until all requests
It's not an implementation of anything. It's a sketch based on a very incomplete description.
1.Fire all 5 ... 2.Begin computations ... 3.While number #2 is computing, ...
All of this sounds quite feasible with 1) and 2) modeled as independent streams to be joined when complete, and 1) itself consisting of 5 more sub-streams joined with zip. I ask you to change your assumption this can only be done with cache, async, and block, but this is now turning into a discussion better suited for StackOverflow.
I'm afraid there isn't anything I can do without a sample that demonstrates the issue. I can't tell you why it blocks based on a loose description of a complex scenario. In addition, if Actuator is the cause of anything that can't be fixed here in the Spring Framework.
Comment From: bfrisoni
@rstoyanchev Thanks for the feedback. I will create a sample project demonstrating the issue.
Comment From: rstoyanchev
Okay, if you get that feel free to comment here for consideration.
Comment From: bfrisoni
I never circled back around but I was able to get this to work by registering an error consumer. Without registering this consumer, it seems if the first subscription (aMono.subscribe()) generates an error, the error is not "cached" and replayed to the second subscription aMono.block() leaving the second subscription hanging waiting for a signal which of course never occurs. Not sure if this expected behavior, but posting this incase anyone else has a similar use case.