在 Spring WebFlux 中,当 Controller 返回 Flux
时,前端可以通过多种方式接收这个异步、流式的数据。最常见和推荐的方式是使用 Server-Sent Events (SSE) 或直接处理 JSON 数组流。
以下是几种主要的前端接收方式:
这是处理 Flux
流式数据最自然、最高效的方式。SSE 允许服务器向浏览器单向推送多个事件。
需要在返回 Flux
的方法上添加 produces = MediaType.TEXT_EVENT_STREAM_VALUE
,并确保返回的是 Flux<T>
。
import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class StreamingController {
// 示例:发送实时的数字流
@GetMapping(value = "/api/chat/qa", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> streamNumbers(String query) {
return Flux.interval(Duration.ofSeconds(1)) // 每秒发出一个数字
.map(Long::intValue)
.take(10); // 只发送10个
}
// 示例:发送用户数据流
@GetMapping(value = "/api/users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.getAllUsersAsStream(); // 假设这是一个Flux<User>
}
}
EventSource
)<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>AI 聊天</title>
</head>
<style>
body{padding: 16px;}
#output{
padding: 20px;
border: solid 1px #ddd;
}
#operation{
margin-top: 16px;
}
#query{
height: 20px;
padding: 4px;
border: 1px solid #ddd;
}
#query:focus,#query:active,#query:hover{
border: 1px solid #ddd;
}
#sendBtn{
height: 30px;
padding: 4px 16px;
border: 1px solid #ddd;
margin: 0 8px;
}
</style>
<body>
<div id="output"></div>
<div id="operation">
<input type="text" placeholder="请输入你的问题..." id="query"><button id="sendBtn" type="button">发送</button>
</div>
<script src="marked.min.js"></script>
<script src="markdown-it.min.js"></script>
<script>
document.getElementById('sendBtn').addEventListener('click', send)
document.getElementById('query').addEventListener('keydown', function(event) {
if (event.key === 'Enter') {
send();
}
})
let markdownContent = '';
function send(){
//每次按钮清空
markdownContent = '';
const query = document.getElementById('query').value;
try{
// 创建 EventSource 连接
const eventSource = new EventSource('/api/chat/qa?query='+encodeURIComponent(query));
let isCompleted = false; // 标记是否已处理完成
// 监听消息事件 (SSE 默认事件)
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data); // 如果是JSON数据(与后端协商返回格式)
console.log('Received:', data);
if (data.body.finishReason != null) {//结束标记(与后端商量)
console.log('markdownContent:',markdownContent)
console.log('✅ 服务器通知:流已完成 finishReason',data.body.finishReason);
isCompleted = true;
eventSource.close(); // 主动关闭 / 彻底关闭重连 ✅ 调用 eventSource.close()
//每次换行
document.getElementById('output').innerHTML +='<br/><br/>'
//清空数据
markdownContent = '';
return;
}
//清空
// 更新UI,例如将数字添加到页面
if (data.body.data.includes('<think>')){
markdownContent += '<br/><br/>'+`<think><br/>`;
}else if (data.body.data.includes('</think>')){
markdownContent +=`<br/></think>`+'<br/><br/>';
}else{
markdownContent += `${data.body.data}`;
}
// 使用 marked 将Markdown转换为HTML
const htmlContent = marked.parse(cleanMarkdown(markdownContent));
document.getElementById('output').innerHTML=htmlContent;
// 滚动到底部
const container = document.body;
container.scrollTop = container.scrollHeight;
};
// 监听连接打开
eventSource.onopen = function(event) {
console.log('SSE connection opened.');
isCompleted = false;
};
// 监听错误
eventSource.onerror = function(event) {
console.error('SSE error:', event);
if (eventSource.readyState === EventSource.CLOSED) {
// 连接已关闭
if (isCompleted) {
console.log('✅ 流已正常完成并关闭');
} else {
console.error('❌ 流因错误而关闭');
// 可以尝试重连,或提示用户
}
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('🔁 正在尝试重新连接...');
// EventSource 会自动重连
} else {
console.error('❌ 发生错误,连接状态:', eventSource.readyState);
}
};
// 可以在需要时关闭连接
// eventSource.close();
}catch (e){
throw e ;
}
document.getElementById('query').value = ''
}
function cleanMarkdown(text) {
return text
.trim() // 去除首尾空白
.replace(/\r\n/g, '\n'); // 统一换行符
}
</script>
</body>
</html>
优点:自动处理连接、重连;浏览器原生支持;适合实时数据推送(如通知、日志、股票行情)。 缺点:只支持服务器到客户端的单向通信;IE 不支持。
Spring WebFlux 也支持将 Flux<T>
序列化为一个“流式 JSON 数组”。数据会以 chunked
方式分块传输,前端可以逐步接收。
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class BatchController {
// 返回一个大的数据流,Spring 会自动分块发送
@GetMapping("/api/large-list")
public Flux<Item> getLargeItemList() {
return itemService.getLargeStream(); // 返回 Flux<Item>
}
}
fetch
+ ReadableStream)
async function fetchFluxStream() {
const response = await fetch('/api/large-list');
if (!response.body) {
throw new Error('ReadableStream not supported');
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = ''; // 用于拼接不完整的JSON片段
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 尝试解析 buffer 中的完整 JSON 对象
// 这里需要根据你的数据格式进行解析,例如以换行分隔
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一行可能是不完整的
for (const line of lines) {
if (line.trim()) {
try {
const item = JSON.parse(line);
console.log('Received item:', item);
// 更新UI
} catch (err) {
console.warn('Failed to parse JSON:', line);
}
}
}
}
// 处理最后可能剩下的不完整数据
if (buffer.trim()) {
try {
const item = JSON.parse(buffer);
console.log('Final item:', item);
} catch (err) {
console.warn('Incomplete final data:', buffer);
}
}
}
fetchFluxStream();
优点:可以传输大量数据而不会阻塞;节省内存。 缺点:解析逻辑复杂,需要处理分块和JSON拼接;不如SSE直观。
如果需要更复杂的双向交互,可以使用 WebSocket 配合 Flux
。
WebSocketHandler
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// 创建一个数据流
Flux<String> dataStream = Flux.interval(Duration.ofSeconds(2))
.map(seq -> "Message " + seq);
// 将数据流发送给客户端
return session.send(dataStream.map(session::textMessage));
}
}
const socket = new WebSocket('ws://localhost:8080/ws/data');
socket.onopen = function(event) {
console.log('WebSocket connected');
};
socket.onmessage = function(event) {
console.log('Received:', event.data);
// 更新UI
};
socket.onerror = function(error) {
console.error('WebSocket error:', error);
};
适用场景:聊天应用、实时协作、游戏等需要双向通信的场景。
方式 | 适用场景 | 前端技术 | 推荐度 |
---|---|---|---|
SSE (Server-Sent Events) | 服务器向客户端推送实时数据流(单向) | EventSource API |
⭐⭐⭐⭐⭐ |
Fetch + ReadableStream | 获取大型数据集,分块加载 | fetch() + ReadableStream |
⭐⭐⭐⭐ |
WebSocket | 需要双向实时通信 | WebSocket API |
⭐⭐⭐⭐ (特定场景) |
对于大多数 Flux
流式数据场景,推荐使用 SSE,因为它简单、高效,并且与 Flux
的流式特性完美匹配。
https://blog.xqlee.com/article/2509010851292299.html