Bug description Flux flow response doesn't seem to work as expected.
Environment Spring AI 1.0.0-SNAPSHOT Java version 21 Windows 10
Steps to reproduce I Tried OpenAI completions API, it replied SSE content
But the following examples replied nothing.
@PostMapping(value = "/ai/openai/completions", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<OpenAiApi.ChatCompletionChunk> aiChat(@RequestBody OpenAiApi.ChatCompletionRequest chatRequest) {
final OpenAiApi openAiApi = new OpenAiApi(openaiBaseUrl, openaiApiKey);
return openAiApi.chatCompletionStream(chatRequest);
}
That's the same
@GetMapping(value = "/ai/generateStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatModel.stream(prompt);
}
Comment From: markpollack
@tzolov I believe SSE to Flux is handled based on some internal conversions related to content type negotation, try without the produces
element in the annotation, e.g.
@GetMapping("/ai/generateStream")
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
var prompt = new Prompt(new UserMessage(message));
return this.chatModel.stream(prompt);
}
and other examples in the docs.
Comment From: Mr-LiuDC
I have tried it and it's the same problem.
Comment From: markpollack
what do you observe? no response? We can make a small app to reproduce, but i've done this type of stuff many times in demos so feel it maybe some other issue in your setup.
Comment From: Mr-LiuDC
what do you observe? no response? We can make a small app to reproduce, but i've done this type of stuff many times in demos so feel it maybe some other issue in your setup.
As I pointed out in the previous picture, it does not continuously output the content of event data, but only responded with a status code of 200.
Comment From: sobychacko
@Mr-LiuDC I am unable to reproduce the issue you are seeing. Here is a simple app that uses the same components you have.
@SpringBootApplication
public class Gh1236Application {
public static void main(String[] args) {
SpringApplication.run(Gh1236Application.class, args);
}
}
@RestController
@RequestMapping("/ai")
class ChatController {
private final OpenAiChatModel chatModel;
@Autowired
public ChatController(OpenAiChatModel chatModel) {
this.chatModel = chatModel;
}
@GetMapping(value = "/generateStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> generateStream(
@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
return chatModel.stream(new Prompt(new UserMessage(message)));
}
}
When I invoke the endpoint as below, I get the streaming output without any issues.
curl localhost:8080/ai/generateStream | sed 's/data://' | jq .
Can you start with this sample code and provide us a way to reproduce the issue you are facing? Thanks!
Comment From: Mr-LiuDC
I finally found the issue—it was caused by my own filter. When I disable it, it works fine.
package com.example.web.filter;
import com.example.common.base.BaseResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.tracing.Tracer;
import jakarta.annotation.Resource;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.util.ContentCachingRequestWrapper;
import org.springframework.web.util.ContentCachingResponseWrapper;
import java.io.IOException;
import static com.example.common.constant.BaseConstant.ApiPathConstant.ApiBasePath;
/**
* @author LDC
*/
@Slf4j
@Order(10)
@Component
public class TraceIdFilter extends OncePerRequestFilter {
@Resource
private Tracer tracer;
private final ObjectMapper mapper = new ObjectMapper();
private final AntPathMatcher antPathMatcher = new AntPathMatcher();
public static final String X_Request_Id = "X-Request-Id";
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// log.info("TraceIdFilter - Request URI: {}", request.getRequestURI());
String traceId = tracer.currentSpan().context().traceId();
request.setAttribute(X_Request_Id, traceId);
ContentCachingRequestWrapper requestWrapper = new ContentCachingRequestWrapper(request);
ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response);
filterChain.doFilter(requestWrapper, responseWrapper);
String contentType = responseWrapper.getContentType();
HttpStatus httpStatus = HttpStatus.valueOf(responseWrapper.getStatus());
String url = requestWrapper.getRequestURI();
boolean targetUrl = antPathMatcher.match(ApiBasePath + "/**", url);
boolean jsonContentType = contentType != null && contentType.contains(MediaType.APPLICATION_JSON_VALUE);
if (targetUrl && jsonContentType) {
byte[] responseBytes = responseWrapper.getContentAsByteArray();
String responseBody = new String(responseBytes, responseWrapper.getCharacterEncoding());
String modifiedBody;
JsonNode root = mapper.readTree(responseBody);
// If it's BaseResponse then set requestId fied.
if (root.has("code") && root.has("timestamp") && root.has("message") && root.has("data")) {
BaseResponse<?> baseResponse = mapper.readValue(responseBody, BaseResponse.class);
baseResponse.setRequestId(traceId);
modifiedBody = mapper.writeValueAsString(baseResponse);
}
// If not, convert it to a BaseResponse
else {
BaseResponse<?> baseResponse = BaseResponse.of(httpStatus, httpStatus.getReasonPhrase(), root);
baseResponse.setRequestId(traceId);
modifiedBody = mapper.writeValueAsString(baseResponse);
}
// Rewrite to response
responseWrapper.resetBuffer();
responseWrapper.getWriter().write(modifiedBody);
}
responseWrapper.setHeader(X_Request_Id, traceId);
responseWrapper.copyBodyToResponse();
}
}