When I use

Flux.interval(Duration.ofSeconds(1)).filter(tick -> {
  log.info("tick: {}", tick);
  return status;
}).take(1).flatMap(tick -> foo()).subscribe(); 

to perform a periodic check task, I expect foo() to be triggered when status=true. However, first of all, I must explain that this is occasional. When the content of log.info("tick: {}", tick); is not printed in the log, the foo() method is unexpectedly called, which means that the filter method occasionally fails, which makes me very distressed. About the definition of the status variable:

private boolean status;

I tried to locate and reproduce the problem stably by myself, but unfortunately failed. I don’t think this is a concurrency and thread safety issue because no matter how the status changes, when the stream meets the conditions, the relevant logs should be printed, and I need to use status in foo(), but it is false, which means that the only reason that can explain it seems to be that the filter method fails?

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>3.4.1</version>
  <relativePath/>
</parent>

<java.version>17</java.version>

Comment From: wilkinsona

The Flux API that you are using is part of Reactor.