https://mp.weixin.qq.com/s/zMy77yq3jcoinQSJGn-O3w
认识 SSE:Server - Sent Events 到底是何方神圣
在深入学习 Spring Boot SSE 推送之前,我们先来全面认识一下 Server - Sent Events(SSE)。简单来说,SSE 是一种基于 HTTP 协议实现服务器向客户端单向实时推送数据的技术。在传统的 HTTP 请求 - 响应模式中,客户端发起请求,服务器返回响应后连接通常就会关闭,若客户端想要获取新数据,必须再次发起请求。而 SSE 打破了这种常规模式,它通过建立持久化的 HTTP 长连接,让服务器能够主动地将数据实时推送给客户端,无需客户端频繁发起请求询问是否有新数据。  SSE 的工作原理并不复杂,客户端通过创建一个 EventSource 对象向服务器发起一个特殊的 HTTP GET 请求,这个请求头包含Accept: text/event-stream,明确告知服务器客户端期望接收的是事件流数据。服务器接收到请求后,会设置特定的响应头,像Content-Type: text/event-stream(表明返回的数据是事件流格式)、Cache-Control: no-cache(防止数据被缓存,确保客户端获取到的始终是最新数据)和Connection: keep-alive(保持连接不断开,实现长连接效果)。之后,服务器就可以按照 SSE 规定的数据格式,持续向客户端发送数据。服务器发送的数据以事件流的形式呈现,每条消息以data:开头,后面跟着实际的数据内容,消息结束时用两个换行符\n\n分隔。例如:data: {"message": "这是一条实时推送的数据"}\n\n。  与其他实时通信技术相比,SSE 有着鲜明的特点和优势。拿它与 WebSocket 对比,WebSocket 是一种全双工通信协议,支持客户端和服务器双向实时通信,适用于需要频繁双向交互的场景,如多人在线游戏、实时协作编辑文档等,在这些场景中,双方都需要实时向对方发送和接收数据。而 SSE 是单向通信,仅支持服务器向客户端推送数据,但它基于 HTTP 协议,实现起来更为简单,对现有网络基础设施(如防火墙、代理)的兼容性更好,非常适合服务器向客户端单向推送实时数据的场景,比如实时通知(像社交媒体的新消息提醒、电商平台的订单状态更新通知)、实时数据流展示(如股票行情、新闻资讯实时更新)以及仪表盘数据实时监控等。此外,SSE 还具备自动重连机制,当连接意外断开时,浏览器端的 EventSource API 会自动尝试重新连接服务器,开发者还能通过事件流中的retry字段由服务器端控制重连的时间间隔,这大大增强了数据传输的稳定性和可靠性。 在实际应用中,SSE 已经在诸多领域发挥着重要作用。在金融领域,股票交易平台利用 SSE 实时推送股票价格走势、交易数据等信息,让投资者能够第一时间掌握市场动态,做出精准的投资决策;新闻媒体行业通过 SSE 将最新的新闻资讯、热点事件实时推送给用户,确保用户获取信息的及时性;在物联网(IoT)场景中,SSE 可用于实时传输传感器数据,实现对设备状态的实时监控和管理。可以说,只要是存在服务器向客户端单向实时推送数据需求的场景,SSE 都能大显身手。
实战演练:Spring Boot 实现 SSE 推送本文基于 WebFlux,详细介绍了 Spring Boot 实现 SSE 推送的多种方法。若想基于 WebMVC 实现,可使用org.springframework.web.servlet.mvc.method.annotation.SseEmitter。 基于定时器实现 SSE 推送
1、引入依赖:在pom.xml文件中添加spring-boot-starter-webflux依赖,为项目引入响应式编程的支持,让我们能够方便地实现 SSE 推送。代码如下: <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2、编写控制器:创建一个控制器类,比如SseController,在其中编写一个方法用于定时发送数据。这里使用Flux.interval来实现定时功能,它会按照指定的时间间隔生成一个序列。每生成一个序列值,就通过ServerSentEvent.builder构建一个包含递增整数数据的ServerSentEvent对象并发送出去,当发送的数据值大于 10 时停止发送。具体代码如下: package cn.pottercoding.controller;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
@RestController
public class SseController {
private final AtomicInteger counter = new AtomicInteger(0);
@CrossOrigin(origins = "*", allowedHeaders = "*", methods = {RequestMethod.GET})
@GetMapping(path = "/interval/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + "; charset=UTF-8")
public Flux<ServerSentEvent<Integer>> streamSseMvc() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<Integer>builder()
.data(counter.incrementAndGet())
.build())
.takeUntil(event -> event.data() > 10)
.doOnComplete(() -> System.out.println("complete"));
}
}
3、搭建 SSE 客户端页面:创建一个简单的 HTML 页面,用于接收服务器推送的数据。在页面中,通过EventSource对象建立与服务器的连接,当接收到服务器推送的数据时,将数据显示在页面上;如果连接出错,在控制台打印错误信息并关闭连接。HTML 页面代码如下: <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Example</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('http://localhost:8080/interval/stream');
eventSource.onmessage = function (event) {
const messagesDiv = document.getElementById('messages');
const newMessage = document.createElement('p');
newMessage.textContent = `Received: ${event.data}`;
messagesDiv.appendChild(newMessage);
};
eventSource.onerror = function (error) {
console.error('EventSource failed:', error);
eventSource.close();
};
</script>
</body>
</html>
4、测试:启动 Spring Boot 应用,然后在浏览器中访问上述 HTML 页面。正常情况下,页面会每秒接收并显示服务器推送的递增整数,直到接收到的数据值大于 10 停止。
基于发布订阅模式实现 SSE 推送Spring Boot 2.4 之前版本
在 Spring Boot 2.4 之前的版本中,实现发布订阅模式主要利用FluxProcessor,它有多种实现类,其中UnicastProcessor、DirectProcessor、ReplayProcessor较为常用。 UnicastProcessor:适用于仅有一个订阅者的场景。比如在一个单用户的实时监控系统中,只需要向特定的一个客户端推送实时数据更新,使用UnicastProcessor就能避免资源浪费,高效地完成数据推送。 DirectProcessor:适合存在多个订阅者的场景。以一个实时消息通知系统为例,多个用户都需要接收相同的实时消息,DirectProcessor可以将生产者发布的消息直接分发给所有订阅者,确保每个订阅者都能及时获取到最新消息。 ReplayProcessor:适合需要向新订阅者重播之前消息的场景。在股票行情实时展示应用中,新用户进入系统时,可能需要获取过去一段时间的股票价格走势等历史消息,ReplayProcessor就可以将之前发布的消息重新发送给新订阅者。
下面以DirectProcessor为例,详细讲解实现步骤: 1、创建发布订阅管理类:创建一个类,如DirectProcessorSsePublisherService,实现SsePublisherService接口(若没有此接口,可自行定义消息发布和订阅相关方法)。在这个类中,创建一个DirectProcessor对象用于处理消息发布与订阅,再通过processor.replay().autoConnect()创建一个可自动连接的Flux对象,以便订阅者能够接收到消息。外部类可以通过getMessages方法订阅消息流,通过publishMessage方法发布新消息,通过complete方法结束消息流。代码如下: import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
public class DirectProcessorSsePublisherService implements SsePublisherService {
private final DirectProcessor<String> processor = DirectProcessor.create();
private final Flux<String> flux = processor.replay().autoConnect();
@Override
public Flux<String> getMessages() {
return flux;
}
@Override
public void publishMessage(String message) {
processor.onNext(message);
}
@Override
public void complete() {
processor.onComplete();
}
}
2、设置定时器生产数据:创建一个定时任务类,如ProduceDataTask,使用@Component注解将其纳入 Spring 容器管理。通过@Autowired注入SsePublisherService实例,在run方法中使用@Scheduled注解设置定时任务,每 5 秒生成一个递增的整数,并将包含该整数的消息通过SsePublisherService发布出去,当生成的整数大于 10 时,结束消息发布。代码如下: import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ProduceDataTask {
@Autowired
private SsePublisherService ssePublisherService;
private final AtomicInteger counter = new AtomicInteger(0);
@Scheduled(fixedRate = 5000)
public void run() {
int num = counter.incrementAndGet();
System.out.println("num = " + num);
ssePublisherService.publishMessage("hello-" + num);
if (num > 10) {
ssePublisherService.complete();
}
}
}
3、编写消费控制器:创建一个控制器类,如SseConsumerController,在其中编写一个方法用于接收并处理发布的消息。通过@GetMapping注解映射请求路径,返回Flux<ServerSentEvent<String>>类型的数据,将从SsePublisherService获取的消息流转换为ServerSentEvent事件流发送给客户端。代码如下: import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class SseConsumerController {
private final SsePublisherService ssePublisherService;
public SseConsumerController(SsePublisherService ssePublisherService) {
this.ssePublisherService = ssePublisherService;
}
@GetMapping(path = "/pubsub/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + "; charset=UTF-8")
public Flux<ServerSentEvent<String>> consumeSse() {
return ssePublisherService.getMessages()
.map(message -> ServerSentEvent.builder()
.data(message)
.build());
}
}
4、搭建 SSE 客户端页面:此页面与基于定时器实现 SSE 推送中的客户端页面类似,同样通过EventSource对象建立与服务器的连接,接收并展示服务器推送的数据。代码如下: <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Pub/Sub Example</title>
</head>
<body>
<div id="pubsub-messages"></div>
<script>
const eventSource = new EventSource('/pubsub/stream');
eventSource.onmessage = function (event) {
const messagesDiv = document.getElementById('pubsub-messages');
const newMessage = document.createElement('p');
newMessage.textContent = `Received: ${event.data}`;
messagesDiv.appendChild(newMessage);
};
eventSource.onerror = function (error) {
console.error('EventSource failed:', error);
eventSource.close();
};
</script>
</body>
</html>
5、测试:启动 Spring Boot 应用,访问上述 HTML 页面,页面会每 5 秒接收并显示服务器发布的包含递增整数的消息,直到接收到的数据值大于 10 停止。 
Spring Boot 2.4 及之后版本
在 Spring Boot 2.4 及之后的版本中,引入了Sinks来管理发布订阅。Sinks相较于之前版本利用FluxProcessor实现发布订阅模式,在功能和使用方式上有一些核心要点和差异。Sinks提供了更简洁、更灵活的 API 来处理响应式流的发布和订阅。它支持多种类型的Sink,如Sinks.Many用于多播场景(类似DirectProcessor适用于多个订阅者的情况),Sinks.One用于单播场景(类似UnicastProcessor适用于单个订阅者的情况)。与FluxProcessor相比,Sinks在性能和资源管理上进行了优化,尤其是在高并发场景下,能够更高效地处理大量的消息发布和订阅请求。 以Sinks.Many为例,实现步骤如下: 1、创建发布订阅管理类:创建一个类,如SinksSsePublisherService,在其中创建一个Sinks.Many<String>对象用于消息发布。通过asFlux方法将Sinks.Many转换为Flux,以便订阅者订阅消息。提供publishMessage方法用于发布消息,通过tryEmitNext方法尝试将消息发送给订阅者。代码如下: import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class SinksSsePublisherService {
private final Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
public Flux<String> getMessages() {
return sink.asFlux();
}
public void publishMessage(String message) {
sink.tryEmitNext(message);
}
}
2、 设置定时器生产数据:定时任务类与 Spring Boot 2.4 之前版本类似,同样使用@Component和@Scheduled注解,注入SinksSsePublisherService实例,定时生成数据并发布。代码如下: import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class SinksProduceDataTask {
@Autowired
private SinksSsePublisherService ssePublisherService;
private final AtomicInteger counter = new AtomicInteger(0);
@Scheduled(fixedRate = 5000)
public void run() {
int num = counter.incrementAndGet();
System.out.println("num = " + num);
ssePublisherService.publishMessage("hello-" + num);
if (num > 10) {
// 可根据Sinks的特性,选择合适的方式结束消息发布,这里简单示例
ssePublisherService.publishMessage("completed");
}
}
}
3、编写消费控制器:控制器类也与之前版本类似,通过@GetMapping注解映射请求路径,返回Flux<ServerSentEvent<String>>类型的数据,将从SinksSsePublisherService获取的消息流转换为ServerSentEvent事件流发送给客户端。代码如下: import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class SinksSseConsumerController {
private final SinksSsePublisherService ssePublisherService;
public SinksSseConsumerController(SinksSsePublisherService ssePublisherService) {
this.ssePublisherService = ssePublisherService;
}
@GetMapping(path = "/sinks/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + "; charset=UTF-8")
public Flux<ServerSentEvent<String>> consumeSse() {
return ssePublisherService.getMessages()
.map(message -> ServerSentEvent.builder()
.data(message)
.build());
}
}
4、搭建 SSE 客户端页面:与之前基于发布订阅模式(Spring Boot 2.4 之前版本)的客户端页面代码相同,通过EventSource对象接收服务器推送的数据。 5、测试:启动应用并访问客户端页面,观察页面是否能按预期接收并显示服务器推送的数据。
常见问题与解决方案
在学习 Spring Boot SSE 推送的过程中,难免会遇到各种问题,下面为大家总结一些常见问题及对应的解决方案 : 连接超时:在 SSE 推送中,由于长连接的特性,若网络不稳定或服务器负载过高,可能出现连接超时问题。这会导致客户端无法正常接收服务器推送的数据,影响实时交互体验。比如在高并发场景下,大量客户端同时与服务器建立 SSE 连接,服务器资源紧张,就容易出现连接超时。解决方案是合理设置连接超时时间,在 Spring Boot 中,可以通过配置ServerProperties来调整。在application.properties(或application.yml)文件中添加配置:server.servlet.connection-timeout=10000(单位是毫秒,这里设置为 10 秒,表示如果 10 秒内没有数据传输,连接将被关闭)。同时,优化服务器性能,如增加服务器资源(CPU、内存等),采用负载均衡技术,将请求均匀分配到多个服务器实例上,减轻单个服务器的压力。 数据乱序:虽然 SSE 本身保证数据按顺序发送,但在复杂的网络环境或高并发情况下,也可能出现数据乱序到达客户端的情况。这会导致客户端展示的数据逻辑混乱,影响用户对数据的正确理解。例如,在网络波动较大时,数据包可能会因为路由等原因导致到达顺序异常。为了解决这个问题,可以在数据中添加唯一标识或时间戳。在服务器端发送数据时,为每个数据添加一个唯一的序列号或者时间戳。客户端在接收数据后,根据这些标识对数据进行排序,确保按照正确的顺序展示。比如在控制器发送数据时:
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + "; charset=UTF-8")
public Flux<ServerSentEvent<MyData>> streamSse() {
return Flux.fromIterable(dataList)
.map(data -> {
// 假设MyData类有id和timestamp字段
data.setId(generateUniqueId());
data.setTimestamp(System.currentTimeMillis());
return ServerSentEvent.builder()
.data(data)
.build();
});
}
在客户端的 JavaScript 代码中,根据时间戳排序: const receivedData = [];
eventSource.onmessage = function (event) {
const data = JSON.parse(event.data);
receivedData.push(data);
receivedData.sort((a, b) => a.timestamp - b.timestamp);
// 重新展示数据
updateUI(receivedData);
};
- 消息丢失:当服务器负载过高、网络不稳定或代码逻辑存在问题时,可能会出现消息丢失的情况。这意味着客户端无法接收到服务器发送的部分数据,导致数据不完整。比如在服务器短时间内产生大量要推送的数据,而网络带宽有限,可能会导致部分数据丢失。为了避免消息丢失,可以采用消息持久化和重发机制。使用消息队列(如 Kafka、RabbitMQ)来存储待推送的消息,即使服务器出现故障或网络波动,消息也不会丢失。同时,在客户端添加重连机制,当检测到连接断开时,自动重新连接服务器,并请求未接收的消息。以使用 RabbitMQ 为例,在 Spring Boot 项目中添加 RabbitMQ 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置 RabbitMQ 连接信息,在application.properties文件中: spring.rabbitmq.host=your-rabbitmq-host
spring.rabbitmq.port=5672
spring.rabbitmq.username=your-username
spring.rabbitmq.password=your-password
在服务器端,将 SSE 推送的数据先发送到 RabbitMQ 队列: import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SseDataSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendData(String data) {
rabbitTemplate.convertAndSend("your-exchange-name", "your-routing-key", data);
}
}
在客户端,通过EventSource重连机制和从队列获取未接收消息的逻辑来保证消息完整性。 通过对这些常见问题的认识和掌握相应的解决方案,能让我们在学习和应用 Spring Boot SSE 推送技术时更加顺畅,避免因这些问题导致的开发进度受阻和应用性能下降。
本文总结通过本次学习,我们深入了解了 Spring Boot SSE 推送技术。从 AI 爆火场景中 SSE 的重要作用切入,认识到 SSE 在实时交互领域的关键价值。接着剖析 SSE 概念、原理及与其他实时通信技术的差异,掌握其基于 HTTP 协议实现单向实时推送的核心特点。在实战环节,我们通过基于定时器和发布订阅模式(涵盖 Spring Boot 2.4 前后版本不同实现方式)实现 SSE 推送,积累了丰富的实践经验,也学会了如何解决连接超时、数据乱序、消息丢失等常见问题。
|