This commit adds support for @Scheduled annotation on reactive methods and on Kotlin suspending functions (provided a bridge to Reactive Streams is available at runtime).

Reactive methods are methods that return an instance of Publisher or an instance of a class that the ReactiveAdapterRegistry can adapt to a Publisher with deferred support.

Kotlin suspending functions are converted to Publisher as well, using CoroutineUtils and the support of the kotlinx.coroutines.reactor bridge.

The bean method that produces the Publisher is only called once, but there can be multiple subscriptions to that instance as configured by the annotation. The usual task-based infrastructure for synchronous code is reused for the purpose of scheduling the subscriptions.

One special case is when a fixedDelay is used, as the non-blocking nature of subscribing to a Publisher makes it harder to adhere to these semantics. As a result, for that particular case only, the subscription is done in a blocking fashion inside the scheduled task.

Publisher onNext events are ignored. Active subscriptions are tracked by the processor so that long-running publishers and infinite publishers are also supported, allowing for cancellation if the associated bean is destroyed or if the context is stopped.

See gh-23533 Closes gh-29924

Comment From: sdeleuze

We should probably review #28515 at the same time and have a common approach.

Comment From: sdeleuze

As discussed, let's implement the Coroutines support as part of this PR via an invocation of CoroutinesUtils#invokeSuspendingFunction(Method, Object, Object...) when KotlinDetector#isSuspendingFunction detect a method as suspending, like we do for the Web support + related tests in a KotlinXxx.kt reusing the same logic from Java tests with suspending function instead of Mono, and if that make sense Flow instead of Flux.

Comment From: simonbasle

What if we would keep ScheduledAnnotationReactiveSupport.isReactive(method) and processScheduledReactive(scheduled, method, bean) API as they are and we just adapt the implementations to support suspending functions?

@sdeleuze done :) As stated in the latest commit, anything Publisher is still in ScheduledAnnotationReactiveSupport, but now there is only one check to perform (and the getPublisherFor method used by ReactiveTask automatically detect the correct case).

Comment From: bclozel

Thanks for tackling this - it's definitely the right time to address this as we'll work on observability in #29883.

Comment From: simonbasle

In the few last commits, I have attempted to take that approach which reuses a maximum of the existing infrastructure by using Runnable to subscribe to the Publishers. In a second step, I've attempted to reintroduce some tracking of active subscriptions. That way, Publisher subscriptions that don't terminate before the originating bean is destroyed will have a chance to be cancelled (including infinite publishers).

Comment From: simonbasle

I have now rebased on top of main to take the Antora doc changes into account, and have updated the PR body to make it a relevant commit message for the current state of the PR.

Comment From: bclozel

I'm currently working on the observability support of scheduled tasks in #29883. It looks like the current proposal for reactive methods support has a difference of behavior. Let's use two variants of scheduled methods, one blocking and the other reactive, both throwing exceptions during their processing:

@Component
public class ScheduledComponent {

    private static final Logger logger = LoggerFactory.getLogger(ScheduledComponent.class);

    @Scheduled(cron = "0,10,20,30,40,50 * * * * *")
    public void blocking() {
        logger.info("Executing 'blocking' @Scheduled method");
        throw new IllegalStateException("Blocking method failed");
    }

    @Scheduled(cron = "2,12,22,32,42,52 * * * * *")
    public Mono<Void> reactive() {
        return Mono.error(() -> new IllegalStateException("Reactive method failed"))
                .doOnError(exc -> logger.info("Executing 'reactive' @Scheduled method"))
                .then();
    }
}

The blocking variant throws the exception from the generated runnable, and it's caught by the org.springframework.util.ErrorHandler infrastructure (this can be customized on TaskScheduler implementations):

2023-06-05T10:44:30.004+02:00  INFO 17840 --- [   scheduling-1] c.example.scheduling.ScheduledComponent  : Executing 'blocking' @Scheduled method
2023-06-05T10:44:30.004+02:00 ERROR 17840 --- [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

java.lang.IllegalStateException: Blocking method failed
    at com.example.scheduling.ScheduledComponent.blocking(ScheduledComponent.java:17) ~[main/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

With the current proposal for reactive support, exceptions are never thrown and the error handling contract is not used:

2023-06-05T10:44:32.019+02:00  INFO 17840 --- [   scheduling-1] c.example.scheduling.ScheduledComponent  : Executing 'reactive' @Scheduled method
2023-06-05T10:44:32.023+02:00  WARN 17840 --- [   scheduling-1] s.s.a.ScheduledAnnotationReactiveSupport : Unexpected error occurred in scheduled reactive task

java.lang.IllegalStateException: Reactive method failed
    at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ @Scheduled 'reactive()' in bean 'com.example.scheduling.ScheduledComponent'
Original Stack Trace:
        at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55) ~[reactor-core-3.5.6.jar:3.5.6]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8671) ~[reactor-core-3.5.6.jar:3.5.6]
        at org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport$SubscribingRunnable.run(ScheduledAnnotationReactiveSupport.java:194) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-06-05T10:44:32.027+02:00 ERROR 17840 --- [   scheduling-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

I don't see this as a limitation, but really as a key difference between imperative and async in general. This behavior is already documented in this PR, but maybe we can mention that the ErrorHandler will not be invoked. From an observability perspective, I think we can probably still record failures with the observations.

Comment From: bclozel

Closed with 35052f2113274be