This commit adds support for @Scheduled
annotation on reactive methods
and on Kotlin suspending functions (provided a bridge to Reactive Streams
is available at runtime).
Reactive methods are methods that return an instance of Publisher or an instance of a class that the ReactiveAdapterRegistry can adapt to a Publisher with deferred support.
Kotlin suspending functions are converted to Publisher as well, using
CoroutineUtils
and the support of the kotlinx.coroutines.reactor
bridge.
The bean method that produces the Publisher
is only called once, but
there can be multiple subscriptions to that instance as configured by
the annotation. The usual task-based infrastructure for synchronous code
is reused for the purpose of scheduling the subscriptions.
One special case is when a fixedDelay
is used, as the non-blocking
nature of subscribing to a Publisher makes it harder to adhere to these
semantics. As a result, for that particular case only, the subscription
is done in a blocking fashion inside the scheduled task.
Publisher onNext events are ignored. Active subscriptions are tracked by the processor so that long-running publishers and infinite publishers are also supported, allowing for cancellation if the associated bean is destroyed or if the context is stopped.
See gh-23533 Closes gh-29924
Comment From: sdeleuze
We should probably review #28515 at the same time and have a common approach.
Comment From: sdeleuze
As discussed, let's implement the Coroutines support as part of this PR via an invocation of CoroutinesUtils#invokeSuspendingFunction(Method, Object, Object...)
when KotlinDetector#isSuspendingFunction
detect a method as suspending, like we do for the Web support + related tests in a KotlinXxx.kt
reusing the same logic from Java tests with suspending function instead of Mono
, and if that make sense Flow
instead of Flux
.
Comment From: simonbasle
What if we would keep
ScheduledAnnotationReactiveSupport.isReactive(method)
andprocessScheduledReactive(scheduled, method, bean)
API as they are and we just adapt the implementations to support suspending functions?
@sdeleuze done :) As stated in the latest commit, anything Publisher
is still in ScheduledAnnotationReactiveSupport
, but now there is only one check to perform (and the getPublisherFor
method used by ReactiveTask
automatically detect the correct case).
Comment From: bclozel
Thanks for tackling this - it's definitely the right time to address this as we'll work on observability in #29883.
Comment From: simonbasle
In the few last commits, I have attempted to take that approach which reuses a maximum of the existing infrastructure by using Runnable
to subscribe to the Publishers. In a second step, I've attempted to reintroduce some tracking of active subscriptions. That way, Publisher
subscriptions that don't terminate before the originating bean is destroyed will have a chance to be cancelled (including infinite publishers).
Comment From: simonbasle
I have now rebased on top of main
to take the Antora doc changes into account, and have updated the PR body to make it a relevant commit message for the current state of the PR.
Comment From: bclozel
I'm currently working on the observability support of scheduled tasks in #29883. It looks like the current proposal for reactive methods support has a difference of behavior. Let's use two variants of scheduled methods, one blocking and the other reactive, both throwing exceptions during their processing:
@Component
public class ScheduledComponent {
private static final Logger logger = LoggerFactory.getLogger(ScheduledComponent.class);
@Scheduled(cron = "0,10,20,30,40,50 * * * * *")
public void blocking() {
logger.info("Executing 'blocking' @Scheduled method");
throw new IllegalStateException("Blocking method failed");
}
@Scheduled(cron = "2,12,22,32,42,52 * * * * *")
public Mono<Void> reactive() {
return Mono.error(() -> new IllegalStateException("Reactive method failed"))
.doOnError(exc -> logger.info("Executing 'reactive' @Scheduled method"))
.then();
}
}
The blocking variant throws the exception from the generated runnable, and it's caught by the org.springframework.util.ErrorHandler
infrastructure (this can be customized on TaskScheduler
implementations):
2023-06-05T10:44:30.004+02:00 INFO 17840 --- [ scheduling-1] c.example.scheduling.ScheduledComponent : Executing 'blocking' @Scheduled method
2023-06-05T10:44:30.004+02:00 ERROR 17840 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
java.lang.IllegalStateException: Blocking method failed
at com.example.scheduling.ScheduledComponent.blocking(ScheduledComponent.java:17) ~[main/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
With the current proposal for reactive support, exceptions are never thrown and the error handling contract is not used:
2023-06-05T10:44:32.019+02:00 INFO 17840 --- [ scheduling-1] c.example.scheduling.ScheduledComponent : Executing 'reactive' @Scheduled method
2023-06-05T10:44:32.023+02:00 WARN 17840 --- [ scheduling-1] s.s.a.ScheduledAnnotationReactiveSupport : Unexpected error occurred in scheduled reactive task
java.lang.IllegalStateException: Reactive method failed
at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ @Scheduled 'reactive()' in bean 'com.example.scheduling.ScheduledComponent'
Original Stack Trace:
at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55) ~[reactor-core-3.5.6.jar:3.5.6]
at reactor.core.publisher.Flux.subscribe(Flux.java:8671) ~[reactor-core-3.5.6.jar:3.5.6]
at org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport$SubscribingRunnable.run(ScheduledAnnotationReactiveSupport.java:194) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2023-06-05T10:44:32.027+02:00 ERROR 17840 --- [ scheduling-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
I don't see this as a limitation, but really as a key difference between imperative and async in general. This behavior is already documented in this PR, but maybe we can mention that the ErrorHandler
will not be invoked. From an observability perspective, I think we can probably still record failures with the observations.
Comment From: bclozel
Closed with 35052f2113274be