Question

Why does delaySequence() with concatMap() behaves as delayElements(), so the delay occurs between each elements?
On the other side, delaySequence() with flatMapSequential() works as expected.

Is it a bug, or is there anything missing in the documentation, or is it just my misunderstanding?

I have asked the same question on StackOVerflow: https://stackoverflow.com/q/77820434/2886891

Background

There are two methods in Flux to delay the processing:

  1. delayElements()

    Delay each of this Flux elements by a given Duration.

  2. delaySequence()

    Shift this Flux forward in time by a given Duration. Unlike with delayElements(Duration), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription).

    With this operator, a source emitting at 10Hz with a delaySequence Duration of 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand, delayElements(Duration) would end up emitting at 1Hz.

Observation

When I call .delaySequence(1s).flatMapSequential(), it behaves as described above.

However, when I call .delaySequence(1s).concatMap(), it behaves like delayElements(), so a source emitting at 10Hz ends up emitting at 1Hz.

I cannot find anything in the documentation of flatMapSequential() and concatMap() which would explain this discrepancy.

The following Java code illustrates it.

delayElements() + flatMapSequential()

@Test
void delayElements_flatMapSequential() {
    log.info("delayElements_flatMapSequential");
    Flux.just("one", "two", "three")
            .delayElements(Duration.ofSeconds(1))
            .flatMapSequential(e -> {
                log.info(e);
                return Mono.just(e);
            })
            .blockLast();
}

produces an initial delay as well as a delay between each element, as expected:

15:04:03.771 delayElements_flatMapSequential
15:04:04.975 one 
15:04:05.976 two 
15:04:06.989 three 

delaySequence() + flatMapSequential()

@Test
void delaySequence_flatMapSequential() {
    log.info("delaySequence_flatMapSequential");
    Flux.just("one", "two", "three")
            .delaySequence(Duration.ofSeconds(1))
            .flatMapSequential(e -> {
                log.info(e);
                return Mono.just(e);
            })
            .blockLast();
}

produces an initial delay and no delay between each element, as expected:

15:04:10.036 delaySequence_flatMapSequential
15:04:11.047 one      <- EXPECTED DELAY
15:04:11.047 two      <- NO DELAY
15:04:11.047 three    <- NO DELAY

delaySequence() + concatMap()

@Test
void delaySequence_concatMap() {
    log.info("delaySequence_concatMap");
    Flux.just("one", "two", "three")
            .delaySequence(Duration.ofSeconds(1))
            .concatMap(e -> {
                log.info(e);
                return Mono.just(e);
            })
            .blockLast();
}

produces an initial delay as well as a delay between each element, as NOT expected:

15:04:06.997 delaySequence_concatMap 
15:04:08.016 one      <- EXPECTED DELAY
15:04:09.027 two      <- UNEXPECTED DELAY
15:04:10.033 three    <- UNEXPECTED DELAY

Comment From: bclozel

This Reactor question is well suited for StackOverflow. Please allow some time for the community to help you, there is no need to create an issue here for that. Thanks!