Spring WebFlux 返回 Flux 前端JavaScript配合接收

编程教程 > Java > Spring (5) 2025-09-01 12:41:57

在 Spring WebFlux 中,当 Controller 返回 Flux 时,前端可以通过多种方式接收这个异步、流式的数据。最常见和推荐的方式是使用 Server-Sent Events (SSE) 或直接处理 JSON 数组流。

以下是几种主要的前端接收方式:


方式一:Server-Sent Events (SSE) - 推荐用于实时流

这是处理 Flux 流式数据最自然、最高效的方式。SSE 允许服务器向浏览器单向推送多个事件。

1. 后端 Controller 配置

需要在返回 Flux 的方法上添加 produces = MediaType.TEXT_EVENT_STREAM_VALUE,并确保返回的是 Flux<T>

 

import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class StreamingController {
    // 示例:发送实时的数字流
    @GetMapping(value = "/api/chat/qa", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> streamNumbers(String query) {
        return Flux.interval(Duration.ofSeconds(1)) // 每秒发出一个数字
                   .map(Long::intValue)
                   .take(10); // 只发送10个
    }
    // 示例:发送用户数据流
    @GetMapping(value = "/api/users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.getAllUsersAsStream(); // 假设这是一个Flux<User>
    }
}

2. 前端 JavaScript 接收 (使用 EventSource)

Spring WebFlux 返回 Flux 前端JavaScript配合接收_图示-80337006c9b04518b5a1b5534209c362.png
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>AI 聊天</title>
</head>
<style>
 body{padding: 16px;}
 #output{
     padding: 20px;
     border: solid 1px #ddd;
}
 #operation{
     margin-top: 16px;
 }
 #query{
     height: 20px;
     padding: 4px;
     border: 1px solid #ddd;
 }
 #query:focus,#query:active,#query:hover{
     border: 1px solid #ddd;
 }
 #sendBtn{
     height: 30px;
     padding: 4px 16px;
     border: 1px solid #ddd;
     margin: 0 8px;
 }
</style>
<body>

<div id="output"></div>
<div id="operation">
    <input type="text" placeholder="请输入你的问题..." id="query"><button id="sendBtn" type="button">发送</button>
</div>
<script src="marked.min.js"></script>
<script src="markdown-it.min.js"></script>
<script>

  document.getElementById('sendBtn').addEventListener('click', send)
  document.getElementById('query').addEventListener('keydown', function(event) {
      if (event.key === 'Enter') {
        send();
      }
  })


  let markdownContent = '';

  function send(){
    //每次按钮清空
    markdownContent = '';
    const query = document.getElementById('query').value;
    try{
      // 创建 EventSource 连接
      const eventSource = new EventSource('/api/chat/qa?query='+encodeURIComponent(query));
      let isCompleted = false; // 标记是否已处理完成
      // 监听消息事件 (SSE 默认事件)
      eventSource.onmessage = function(event) {
        const data = JSON.parse(event.data); // 如果是JSON数据(与后端协商返回格式)
        console.log('Received:', data);
        if (data.body.finishReason != null) {//结束标记(与后端商量)
            console.log('markdownContent:',markdownContent)
          console.log('✅ 服务器通知:流已完成 finishReason',data.body.finishReason);
          isCompleted = true;
          eventSource.close(); // 主动关闭 / 彻底关闭重连	✅ 调用 eventSource.close()
            //每次换行
            document.getElementById('output').innerHTML +='<br/><br/>'
            //清空数据
            markdownContent = '';
          return;
        }
        //清空
        // 更新UI,例如将数字添加到页面
          if (data.body.data.includes('<think>')){
              markdownContent += '<br/><br/>'+`&lt;think&gt;<br/>`;
          }else if (data.body.data.includes('</think>')){
              markdownContent +=`<br/>&lt;/think&gt;`+'<br/><br/>';
          }else{
              markdownContent += `${data.body.data}`;
          }
          // 使用 marked 将Markdown转换为HTML
          const htmlContent = marked.parse(cleanMarkdown(markdownContent));
          document.getElementById('output').innerHTML=htmlContent;

        // 滚动到底部
        const container = document.body;
        container.scrollTop = container.scrollHeight;
      };
      // 监听连接打开
      eventSource.onopen = function(event) {
        console.log('SSE connection opened.');
        isCompleted = false;
      };
      // 监听错误
      eventSource.onerror = function(event) {
        console.error('SSE error:', event);
        if (eventSource.readyState === EventSource.CLOSED) {
          // 连接已关闭
          if (isCompleted) {
            console.log('✅ 流已正常完成并关闭');
          } else {
            console.error('❌ 流因错误而关闭');
            // 可以尝试重连,或提示用户
          }
        } else if (eventSource.readyState === EventSource.CONNECTING) {
          console.log('🔁 正在尝试重新连接...');
          // EventSource 会自动重连
        } else {
          console.error('❌ 发生错误,连接状态:', eventSource.readyState);
        }
      };
      // 可以在需要时关闭连接
      // eventSource.close();
    }catch (e){
      throw e ;
    }
      document.getElementById('query').value = ''
  }

  function cleanMarkdown(text) {
      return text
          .trim()                    // 去除首尾空白
          .replace(/\r\n/g, '\n');   // 统一换行符
  }
</script>
</body>
</html>

优点:自动处理连接、重连;浏览器原生支持;适合实时数据推送(如通知、日志、股票行情)。 缺点:只支持服务器到客户端的单向通信;IE 不支持。


方式二:直接返回 JSON 数组流 (Chunked Transfer Encoding)

Spring WebFlux 也支持将 Flux<T> 序列化为一个“流式 JSON 数组”。数据会以 chunked 方式分块传输,前端可以逐步接收。

1. 后端 Controller

 

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class BatchController {
    // 返回一个大的数据流,Spring 会自动分块发送
    @GetMapping("/api/large-list")
    public Flux<Item> getLargeItemList() {
        return itemService.getLargeStream(); // 返回 Flux<Item>
    }
}

2. 前端 JavaScript 接收 (使用 fetch + ReadableStream)

 

async function fetchFluxStream() {
    const response = await fetch('/api/large-list');
    
    if (!response.body) {
        throw new Error('ReadableStream not supported');
    }
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = ''; // 用于拼接不完整的JSON片段
    while (true) {
        const { done, value } = await reader.read();
        
        if (done) break;
        buffer += decoder.decode(value, { stream: true });
        // 尝试解析 buffer 中的完整 JSON 对象
        // 这里需要根据你的数据格式进行解析,例如以换行分隔
        const lines = buffer.split('\n');
        buffer = lines.pop(); // 最后一行可能是不完整的
        for (const line of lines) {
            if (line.trim()) {
                try {
                    const item = JSON.parse(line);
                    console.log('Received item:', item);
                    // 更新UI
                } catch (err) {
                    console.warn('Failed to parse JSON:', line);
                }
            }
        }
    }
    // 处理最后可能剩下的不完整数据
    if (buffer.trim()) {
        try {
            const item = JSON.parse(buffer);
            console.log('Final item:', item);
        } catch (err) {
            console.warn('Incomplete final data:', buffer);
        }
    }
}
fetchFluxStream();

优点:可以传输大量数据而不会阻塞;节省内存。 缺点:解析逻辑复杂,需要处理分块和JSON拼接;不如SSE直观。


方式三:使用 WebSocket (双向通信)

如果需要更复杂的双向交互,可以使用 WebSocket 配合 Flux

1. 后端配置 WebSocketHandler

 

@Component
public class MyWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 创建一个数据流
        Flux<String> dataStream = Flux.interval(Duration.ofSeconds(2))
                                     .map(seq -> "Message " + seq);
        // 将数据流发送给客户端
        return session.send(dataStream.map(session::textMessage));
    }
}

2. 前端 JavaScript

 

const socket = new WebSocket('ws://localhost:8080/ws/data');
socket.onopen = function(event) {
    console.log('WebSocket connected');
};
socket.onmessage = function(event) {
    console.log('Received:', event.data);
    // 更新UI
};
socket.onerror = function(error) {
    console.error('WebSocket error:', error);
};

适用场景:聊天应用、实时协作、游戏等需要双向通信的场景。


总结

方式 适用场景 前端技术 推荐度
SSE (Server-Sent Events) 服务器向客户端推送实时数据流(单向) EventSource API ⭐⭐⭐⭐⭐
Fetch + ReadableStream 获取大型数据集,分块加载 fetch() + ReadableStream ⭐⭐⭐⭐
WebSocket 需要双向实时通信 WebSocket API ⭐⭐⭐⭐ (特定场景)

对于大多数 Flux 流式数据场景,推荐使用 SSE,因为它简单、高效,并且与 Flux 的流式特性完美匹配。


评论
User Image
提示:请评论与当前内容相关的回复,广告、推广或无关内容将被删除。

相关文章
Flux 和 Mono 区别说明Flux 和 Mono 是 Spring WebFlux 框架中用于响应式编程的核心类型,它们都来自于 Project Reac
在 Spring WebFlux 中,当 Controller 返回 Flux 时,前端可以通过多种方式接收这个异步、流式的数据。最常见和推荐的方式是使用 Se
Spring WebFlux 项目实战 在Spring WebFlux中创建多个RouterFunctions,在这篇文章中,我们将着眼于在Spring WebFlux中将多个路由器功能定义到不...
Spring WebFlux,spring框架5.0将会新增的web增强框架,这里主要讲述什么是Spring WebFlux以及Spring WebFlux的新功能,Spring WebFlux...
Spring WebFlux入门程序hello word。本文主要在于讲解如何创建和运行spring webflux入门程序hello word。其实不难发现和spring mvc相比代码层基本...
spring boot webflux client实战,webclient是spring webflux的一个小组件。对于Java的http通讯来说,webclient是非常简单易用的。
1.引言Spring 5通过引入一种名为Spring WebFlux的全新反应框架来支持响应式编程范例
引言Spring Boot 2.0最近去了GA,所以我决定写我关于Spring的第一篇文章很长一段时间
1.引言Spring开发人员,您是否曾经觉得需要一个易于使用且高效的流畅功能样式 API 的异步/非阻塞 HTTP客户端?如果是,那么我欢迎您阅读关于WebClient的文章,WebClient...
从Spring 6和Spring Boot 3开始,Spring framework支持将远程HTTP服务代理为带有HTTP交换注解方法的Java接口。类似的库,如OpenFeign和Retro...
Spring Boot 2.0 有哪些新特性_Spring Boot 2.0新功能,在本文中,我们将探讨为Spring Boot 2.0计划的一些更改和功能。我们还会描述这些变化如何帮助我们提高...
Spring Context 与Spring MVC Context那些坑
Spring框架Spring IoC容器的核心原理,前三篇已经从历史的角度和大家一起探讨了为什么会有Spring,Spring的两个核心概念:IoC和AOP的雏形,Spring的历史变迁和如今的...
Spring Boot 2.0,Spring框架的Spring Boot 中的Spring Boot Actuator变化讲解。并且了解如何在Spring Boot 2.0中使用Actuator...
java编程中spring框架5.0介绍说明/概述,spring5,spring框架,java编程