Hi, I continue here the issue from gitter https://gitter.im/spring-projects/spring-boot?at=5e3800b16f9d3d34981a7c43 In short - I spot the problem with cache operator while returing a Flux from controller endopoint with produce=APPLICATION_STREAM_JSON_VALUE. - This flux never ends (even if is completed) and what is worse, as I see it returns only the first value.

Repo: https://github.com/Azbesciak/FluxCacheProblemJsonStream

Comment From: bclozel

I've reproduced this issue with:

@Controller
public class TestController {

    Flux<Long> result = Flux.just(1L,2L,3L,4L).log().cache(2);

    @GetMapping(path = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    @ResponseBody
    public Flux<Long> test() {
        return this.result;
    }
}
[ctor-http-nio-2] reactor.Flux.Array.1                     : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ctor-http-nio-2] reactor.Flux.Array.1                     : | request(2)
[ctor-http-nio-2] reactor.Flux.Array.1                     : | onNext(1)
[ctor-http-nio-2] reactor.Flux.Array.1                     : | onNext(2)

On the other hand, testing with StepVerifier shows:

@Test
void test() {
    Flux<Integer> response = Flux.just(1, 2, 3, 4, 5).log().cache(1);
    StepVerifier.create(response).expectNextCount(5).verifyComplete();
}
onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onNext(5)
onComplete()

I don't know if this behavior difference is about the testing setup (and here especially the subscription demand), or if this is linked to our ChannelSendOperator or some other part of our infrastructure.

Comment From: rstoyanchev

After the first emission ChannelSendOperator is merely a passthrough. I don't suspect the issue is there. I think this is related to Reactor Core somehow.

Using the snippet from @bclozel and modifying result to:

Flux<Long> result = Flux.just(1L, 2L, 3L, 4L).log("B").cache(2).log("A");

In Boot 2.2.4 the output shows cache only ever requests 2:

2020-02-04 13:53:37.740  INFO 2018 --- [or-http-epoll-2] A                                        : | onSubscribe([Fuseable] FluxReplay.ReplayInner)
2020-02-04 13:53:37.743  INFO 2018 --- [or-http-epoll-2] A                                        : | request(1)
2020-02-04 13:53:37.745  INFO 2018 --- [or-http-epoll-2] B                                        : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2020-02-04 13:53:37.746  INFO 2018 --- [or-http-epoll-2] B                                        : | request(2)
2020-02-04 13:53:37.746  INFO 2018 --- [or-http-epoll-2] B                                        : | onNext(1)
2020-02-04 13:53:37.746  INFO 2018 --- [or-http-epoll-2] A                                        : | onNext(1)
2020-02-04 13:53:37.792  INFO 2018 --- [or-http-epoll-2] A                                        : | request(31)
2020-02-04 13:53:37.793  INFO 2018 --- [or-http-epoll-2] B                                        : | onNext(2)
2020-02-04 13:53:37.794  INFO 2018 --- [or-http-epoll-2] A                                        : | onNext(2)

In Boot 2.1.12 cache requests unbounded:

2020-02-04 13:55:39.659  INFO 2375 --- [or-http-epoll-2] A                                        : | onSubscribe([Fuseable] FluxReplay.ReplayInner)
2020-02-04 13:55:39.667  INFO 2375 --- [or-http-epoll-2] A                                        : | request(1)
2020-02-04 13:55:39.669  INFO 2375 --- [or-http-epoll-2] B                                        : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2020-02-04 13:55:39.670  INFO 2375 --- [or-http-epoll-2] B                                        : | request(unbounded)
2020-02-04 13:55:39.670  INFO 2375 --- [or-http-epoll-2] B                                        : | onNext(1)
2020-02-04 13:55:39.670  INFO 2375 --- [or-http-epoll-2] A                                        : | onNext(1)
2020-02-04 13:55:39.703  INFO 2375 --- [or-http-epoll-2] A                                        : | request(31)
2020-02-04 13:55:39.704  INFO 2375 --- [or-http-epoll-2] B                                        : | onNext(2)
2020-02-04 13:55:39.704  INFO 2375 --- [or-http-epoll-2] A                                        : | onNext(2)
2020-02-04 13:55:39.705  INFO 2375 --- [or-http-epoll-2] B                                        : | onNext(3)
2020-02-04 13:55:39.705  INFO 2375 --- [or-http-epoll-2] A                                        : | onNext(3)
2020-02-04 13:55:39.706  INFO 2375 --- [or-http-epoll-2] B                                        : | onNext(4)
2020-02-04 13:55:39.707  INFO 2375 --- [or-http-epoll-2] A                                        : | onNext(4)
2020-02-04 13:55:39.709  INFO 2375 --- [or-http-epoll-2] B                                        : | onComplete()
2020-02-04 13:55:39.709  INFO 2375 --- [or-http-epoll-2] A                                        : | onComplete()

Comment From: simonbasle

There is indeed a regression, as well as a first attempt at a fix before this issue came in and proved that attempt insufficient. Now tracked in reactor/reactor-core#2030

Comment From: rstoyanchev

Thanks, I'll close this one then.

Comment From: simonbasle

@Azbesciak the fix is available in reactor-core 3.3.3.BUILD-SNAPSHOT, can you validate against your actual code?

Comment From: Azbesciak

@simonbasle Looks ok, thanks.

PS: Could not compile via Intellij because of

Error:java: Illegal char <:> at index 116: [...]\AppData\Local\JetBrains\Toolbox\apps\IDEA-U\ch-0\193.6015.39\jbr\bin\Could not find io.projectreactor:reactor-core:3.3.3.BUILD-SNAPSHOT.

Windows 10 :)