依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
java
package com.fan.model.paiRecord;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/ai")
public class AiStreamController {
private static final String API_ENDPOINT = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions";
private static final String API_KEY = "xxxxxxxxxxxxxxx";
private static final ObjectMapper objectMapper = new ObjectMapper();
@PostMapping("/stream")
public Flux<String> streamChat(@RequestBody Map<String, String> request) {
String userMessage = request.get("message");
String idStr = request.get("id");
if (idStr == null || idStr.trim().isEmpty()) {
return Flux.error(new IllegalArgumentException("id 不能为空"));
}
if (userMessage == null || userMessage.trim().isEmpty()) {
return Flux.error(new IllegalArgumentException("消息不能为空"));
}
StringBuilder fullResponse = new StringBuilder();
WebClient client = WebClient.builder()
.baseUrl(API_ENDPOINT)
.defaultHeader("Authorization", "Bearer " + API_KEY)
.defaultHeader("Content-Type", "application/json")
.build();
Map<String, Object> systemMsg = Map.of("role", "system", "content",
"请用纯文本回答,不要使用任何 Markdown、HTML 或富文本格式。不要加粗、不要列表、不要代码块,只输出自然语言文本。");
Map<String, Object> userMsg = Map.of("role", "user", "content", userMessage);
List<Map<String, Object>> messages = List.of(systemMsg, userMsg);
Map<String, Object> requestBody = Map.of(
"model", "qwen-plus",
"messages", messages,
"stream", true,
"stream_options", Map.of("include_usage", true)
);
Flux<DataBuffer> dashScopeStream = client.post()
.body(BodyInserters.fromValue(requestBody))
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class);
StringBuilder buffer = new StringBuilder();
Flux<String> contentStream = dashScopeStream
.publishOn(Schedulers.boundedElastic())
.flatMap(dataBuffer -> {
try {
String chunk = dataBuffer.toString(StandardCharsets.UTF_8);
DataBufferUtils.release(dataBuffer);
buffer.append(chunk);
List<String> contents = new ArrayList<>();
while (buffer.indexOf("\n\n") != -1) {
int endIndex = buffer.indexOf("\n\n");
String event = buffer.substring(0, endIndex).trim();
buffer.delete(0, endIndex + 2);
if (event.startsWith("data: ")) {
String jsonData = event.substring(6).trim();
if ("[DONE]".equals(jsonData)) {
return Flux.empty();
}
try {
JsonNode root = objectMapper.readTree(jsonData);
JsonNode choices = root.path("choices");
if (choices.isArray() && !choices.isEmpty()) {
JsonNode delta = choices.get(0).path("delta");
if (delta.has("content")) {
String content = delta.get("content").asText();
contents.add(content);
fullResponse.append(content);
}
}
} catch (Exception e) {
}
}
}
return Flux.fromIterable(contents);
} catch (Exception e) {
return Flux.error(e);
}
});
return contentStream
.doOnComplete(() -> {
String aiResponse = fullResponse.toString().trim();
saveToDatabaseAsync(userMessage, aiResponse, idStr);
})
.doOnError(throwable -> {
System.err.println("流处理出错: " + throwable.getMessage());
});
}
private void saveToDatabaseAsync(String userMessage, String aiResponse, String idStr) {
CompletableFuture.runAsync(() -> {
try {
System.out.println("【保存数据库】用户: " + userMessage);
System.out.println("【保存数据库】AI: " + aiResponse);
} catch (Exception e) {
System.err.println("保存失败: " + e.getMessage());
}
}, Executors.newCachedThreadPool());
}
}
前端
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>AI 流式对话</title>
</head>
<body>
<h2>AI 流式回复(六爻解释)</h2>
<div id="output" style="white-space: pre-wrap; font-family: monospace;"></div>
<script>
const output = document.getElementById('output');
const message = `1+1=?`;
fetch('', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: message,id:'2008856506596999170' })
})
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function read() {
reader.read().then(({ done, value }) => {
if (done) {
console.log('流结束');
return;
}
const chunk = decoder.decode(value, { stream: true });
output.textContent += chunk;
read();
}).catch(err => {
console.error('流读取错误:', err);
});
}
read();
})
.catch(err => {
console.error('请求失败:', err);
});
</script>
</body>
</html>