Bug Description

When we use streams to output agreed upon code, there is a problem of code fragment confusion.

Reason: class OpenAiApi ``` public Flux chatCompletionStream(ChatCompletionRequest chatRequest) { Assert.notNull(chatRequest, "The request body can not be null."); Assert.isTrue(chatRequest.stream(), "Request must set the steam property to true."); AtomicBoolean isInsideTool = new AtomicBoolean(false); log.debug("openai.chat-request:{}", JsonUtils.toJson(chatRequest)); return ((WebClient.RequestBodySpec) this.webClient.post().uri("/v1/chat/completions", new Object[0])).body( Mono.just(chatRequest), ChatCompletionRequest.class).retrieve().bodyToFlux(String.class) .takeUntil(SSE_DONE_PREDICATE).filter(SSE_DONE_PREDICATE.negate()).map((content) -> { return (ChatCompletionChunk) ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class); }) .map((chunk) -> { if (this.chunkMerger.isStreamingToolFunctionCall(chunk)) { isInsideTool.set(true); }

                return chunk;
            }).windowUntil((chunk) -> {
                if (isInsideTool.get() && this.chunkMerger.isStreamingToolFunctionCallFinish(chunk)) {
                    isInsideTool.set(false);
                    return true;
                } else {
                    return !isInsideTool.get();
                }
            }).concatMapIterable((window) -> {
                Mono<ChatCompletionChunk> monoChunk = window.reduce(
                        new ChatCompletionChunk((String) null, (List) null, (Long) null, (String) null,
                                (String) null, (String) null), (previous, current) -> {
                            return this.chunkMerger.merge(previous, current);
                        });
                return List.of(monoChunk);
            }).flatMap((mono) -> {
                return mono;
            });

``` To convert data, the flatMap method uses parallel processing of chunks. When the speed is too fast, there may be a problem of chunk output being scrambled

environment JDK17

Spring 3.2.0

Spring ai 1.0.0-SNAPSHOT

Service deployment in Singapore

Copy Steps

Calling chatCompletionStream in the OpenAiApi class to implement responsive data output

Expected behavior

No chunk disorder occurs

Comment From: YunaiV

I had the same problem

Comment From: zbn116

I also encountered this issue, and trying to modify it to use concatMap can resolve it.