Bug description Flux flow response doesn't seem to work as expected.

Environment Spring AI 1.0.0-SNAPSHOT Java version 21 Windows 10

Steps to reproduce I Tried OpenAI completions API, it replied SSE content

openai chat completions

But the following examples replied nothing.

1723901927764

@PostMapping(value = "/ai/openai/completions", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<OpenAiApi.ChatCompletionChunk> aiChat(@RequestBody OpenAiApi.ChatCompletionRequest chatRequest) {
    final OpenAiApi openAiApi = new OpenAiApi(openaiBaseUrl, openaiApiKey);
    return openAiApi.chatCompletionStream(chatRequest);
}

That's the same

@GetMapping(value = "/ai/generateStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
    Prompt prompt = new Prompt(new UserMessage(message));
    return chatModel.stream(prompt);
}

Comment From: markpollack

@tzolov I believe SSE to Flux is handled based on some internal conversions related to content type negotation, try without the produces element in the annotation, e.g.


    @GetMapping("/ai/generateStream")
    public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        var prompt = new Prompt(new UserMessage(message));
        return this.chatModel.stream(prompt);
    }

and other examples in the docs.

Comment From: Mr-LiuDC

I have tried it and it's the same problem.

Comment From: markpollack

what do you observe? no response? We can make a small app to reproduce, but i've done this type of stuff many times in demos so feel it maybe some other issue in your setup.

Comment From: Mr-LiuDC

what do you observe? no response? We can make a small app to reproduce, but i've done this type of stuff many times in demos so feel it maybe some other issue in your setup.

As I pointed out in the previous picture, it does not continuously output the content of event data, but only responded with a status code of 200.

Comment From: sobychacko

@Mr-LiuDC I am unable to reproduce the issue you are seeing. Here is a simple app that uses the same components you have.

@SpringBootApplication
public class Gh1236Application {
    public static void main(String[] args) {
        SpringApplication.run(Gh1236Application.class, args);
    }
}

@RestController
@RequestMapping("/ai")
class ChatController {
    private final OpenAiChatModel chatModel;

    @Autowired
    public ChatController(OpenAiChatModel chatModel) {
        this.chatModel = chatModel;
    }

    @GetMapping(value = "/generateStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> generateStream(
            @RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
        return chatModel.stream(new Prompt(new UserMessage(message)));
    }
}

When I invoke the endpoint as below, I get the streaming output without any issues.

curl localhost:8080/ai/generateStream | sed 's/data://' | jq .

Can you start with this sample code and provide us a way to reproduce the issue you are facing? Thanks!

Comment From: Mr-LiuDC

I finally found the issue—it was caused by my own filter. When I disable it, it works fine.

package com.example.web.filter;

import com.example.common.base.BaseResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.tracing.Tracer;
import jakarta.annotation.Resource;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.util.ContentCachingRequestWrapper;
import org.springframework.web.util.ContentCachingResponseWrapper;

import java.io.IOException;

import static com.example.common.constant.BaseConstant.ApiPathConstant.ApiBasePath;

/**
 * @author LDC
 */
@Slf4j
@Order(10)
@Component
public class TraceIdFilter extends OncePerRequestFilter {

    @Resource
    private Tracer tracer;
    private final ObjectMapper mapper = new ObjectMapper();
    private final AntPathMatcher antPathMatcher = new AntPathMatcher();
    public static final String X_Request_Id = "X-Request-Id";

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        // log.info("TraceIdFilter - Request URI: {}", request.getRequestURI());
        String traceId = tracer.currentSpan().context().traceId();
        request.setAttribute(X_Request_Id, traceId);
        ContentCachingRequestWrapper requestWrapper = new ContentCachingRequestWrapper(request);
        ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response);
        filterChain.doFilter(requestWrapper, responseWrapper);

        String contentType = responseWrapper.getContentType();
        HttpStatus httpStatus = HttpStatus.valueOf(responseWrapper.getStatus());
        String url = requestWrapper.getRequestURI();
        boolean targetUrl = antPathMatcher.match(ApiBasePath + "/**", url);
        boolean jsonContentType = contentType != null && contentType.contains(MediaType.APPLICATION_JSON_VALUE);

        if (targetUrl && jsonContentType) {
            byte[] responseBytes = responseWrapper.getContentAsByteArray();
            String responseBody = new String(responseBytes, responseWrapper.getCharacterEncoding());

            String modifiedBody;
            JsonNode root = mapper.readTree(responseBody);
            // If it's BaseResponse then set requestId fied.
            if (root.has("code") && root.has("timestamp") && root.has("message") && root.has("data")) {
                BaseResponse<?> baseResponse = mapper.readValue(responseBody, BaseResponse.class);
                baseResponse.setRequestId(traceId);
                modifiedBody = mapper.writeValueAsString(baseResponse);
            }
            // If not, convert it to a BaseResponse
            else {
                BaseResponse<?> baseResponse = BaseResponse.of(httpStatus, httpStatus.getReasonPhrase(), root);
                baseResponse.setRequestId(traceId);
                modifiedBody = mapper.writeValueAsString(baseResponse);
            }

            // Rewrite to response
            responseWrapper.resetBuffer();
            responseWrapper.getWriter().write(modifiedBody);
        }

        responseWrapper.setHeader(X_Request_Id, traceId);
        responseWrapper.copyBodyToResponse();
    }
}