参考:https://zhuanlan.zhihu.com/p/258097038 https://lawrenceli.me/blog/websocket-cluster#%E7%AE%80%E5%8D%95%E5%B9%BF%E6%92%AD%E5%AE%9E%E7%8E%B0-websocket-%E9%9B%86%E7%BE%A4
单例WebSocket回顾 示例代码没有使用 Spring Boot 集成,是使用 javax.websocket API 实现的单例 WebSocket 应用。这是一个简单的例子,实际使用上可以添加心跳检测机制、异步广播、清理无效会话等功能;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import javax.websocket.*;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.concurrent.ConcurrentHashMap;@ServerEndpoint("/chat") public class SimpleWebSocketServer { private static final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap <>(); @OnOpen public void onOpen (Session session) { sessions.put(session.getId(), session); System.out.println("新客户端连接: " + session.getId()); sendMessage(session, "欢迎加入聊天室! 你的ID: " + session.getId()); broadcast("系统消息: 用户 " + session.getId() + " 加入聊天室" ); } @OnMessage public void onMessage (String message, Session sender) { System.out.println("收到消息 [" + sender.getId() + "]: " + message); broadcast("用户 " + sender.getId() + ": " + message); } @OnClose public void onClose (Session session, CloseReason reason) { sessions.remove(session.getId()); System.out.println("客户端断开: " + session.getId() + ", 原因: " + reason.getReasonPhrase()); broadcast("系统消息: 用户 " + session.getId() + " 离开聊天室" ); } @OnError public void onError (Session session, Throwable throwable) { System.err.println("连接错误: " + session.getId()); throwable.printStackTrace(); } private void broadcast (String message) { sessions.forEach((id, session) -> { if (session.isOpen()) { sendMessage(session, message); } }); } private void sendMessage (Session session, String message) { try { session.getAsyncRemote().sendText(message); } catch (Exception e) { System.err.println("发送消息失败: " + e.getMessage()); } } }
重写的方法:
onOpen:在客户端与WebSocket服务连接时触发方法执行
onClose:在客户端与WebSocket连接断开的时候触发执行
onMessage:在接收到客户端发送的消息时触发执行
onError:在发生错误时触发执行
流程:
客户端与服务端建立 TCP 连接,使用 HTTP 协议升级为 WebSocket 协议连接到服务端的 /chat 端点,触发 onOpen 方法;
客户端以 WebSocket 数据帧的形式发送消息;
服务端的 onMessage 方法被触发;
服务端使用 WebSocket Session 进行数据发送;
若客户端主动下线,触发 onClose;若通信时网络中断触发 onError;
单例和分布式WebSocket的区别 一图流:
对比维度
单例环境
分布式环境
问题根源
连接管理
单节点内存维护所有连接(如 ConcurrentHashMap)
连接分散在多个节点,无全局视图
WebSocket 的 Session 基于 TCP 连接,无法跨节点序列化或共享
消息路由
直接遍历本地连接推送消息
需跨节点定位目标连接,否则部分用户收不到消息
用户连接可能分布在任意节点,发送方节点无法直接访问接收方连接
负载均衡
无需考虑连接分布
新增/减少节点时连接分配不均(如轮询算法)
持久连接无法像 HTTP 请求那样动态重分配到新节点
会话状态
Session 绑定到本地内存
Session 状态需外部存储(如 Redis)
WebSocket Session 包含 TCP 连接信息,无法在集群间迁移
分析: WebSocket 是有状态的协议,每个连接是绑定到特定服务器节点的。
单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。
换成分布式系统后,如果客户端连接的服务器发生故障或负载均衡切换,其他节点无法获取该连接的状态。
如:客服系统单机部署时,用户A与客服B连接同一节点,消息可正常推送;分布式部署后,若用户A连接节点1,客服B连接节点2,则用户A的消息无法到达客服B。
单机应用下不会出现通信问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 sequenceDiagram title 单机部署 - 消息正常推送 participant 用户A as 用户A participant 节点 as 节点1 participant 客服B as 客服B rect rgba(0, 128, 0, 0.1) 用户A->>节点: 连接请求 节点-->>用户A: 连接成功 客服B->>节点: 连接请求 节点-->>客服B: 连接成功 用户A->>节点: 发送消息给客服B 节点->>节点: 查找本地连接<br/>找到客服B会话 节点->>客服B: 推送消息 客服B-->>节点: 接收成功 end
分布式下出现通信问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram title 分布式部署 - 消息无法送达 participant 用户A as 用户A participant LB as 负载均衡器 participant 节点1 as 节点1 participant 节点2 as 节点2 participant 客服B as 客服B rect rgba(128, 0, 0, 0.1) 用户A->>LB: 连接请求 LB->>节点1: 分配用户A 节点1->>用户A: 连接成功 客服B->>LB: 连接请求 LB->>节点2: 分配客服B 节点2->>客服B: 连接成功 用户A->>节点1: 发送消息给客服B 节点1->>节点1: 查找本地连接<br/>未找到客服B 节点1-->>用户A: 发送失败/无响应 end
Q: WebSocket 连接通过 Nginx 通过 Gateway 再到服务器,连接上之后就不会再走 Nginx 和Gateway 的均衡负载选择后端服务器了吗?
A: 连接建立后,WebSocket 数据仍需经过 Nginx 和 Gateway,因为它们作为代理维持了 TCP 长连接,会持续作为中间代理,转发所有数据帧,但不再重新选择服务器节点(除非连接中断或主动切换)
常见解决方案 广播模式 原理: 使用消息队列进行广播。 在接收到消息时,不是直接通过 WebSocket 发送消息给对应客户端,而是发布消息到中间件(如 Redis Pub/Sub、RabbitMQ),所有节点消费并判断是否需处理本地连接。
优点:实现简单,扩展性强。
缺点:网络流量大(消息被所有节点消费),给所有节点带来压力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 sequenceDiagram participant 用户A as 用户A participant 节点1 as 节点1 participant MQ as 消息中间件 participant 节点2 as 节点2 participant 客服B as 客服B rect rgba(0, 100, 200, 0.1) note over 用户A,节点1: 用户A发送消息 用户A->>节点1: 发送消息给客服B note over 节点1,MQ: 发布消息到中间件 节点1->>MQ: 发布消息到队列{"target": "客服B", "content": "..."} note over MQ: 中间件广播消息 MQ-->>节点1: 推送消息(所有节点都收到) MQ-->>节点2: 推送消息 note over 节点1,节点1: 节点1处理消息 节点1->>节点1: 检查本地连接 客服B不在本节点 → 忽略 note over 节点2,客服B: 节点2处理消息 节点2->>节点2: 检查本地连接 找到客服B会话 节点2->>客服B: 推送消息 客服B-->>节点2: 接收成功 end
路由模式 原理: 中央路由表+精准投递。
如:连接时存储(用户ID, 节点ID)映射关系存储到路由表,发送消息时根据对方客户端所在的节点ID作为消息队列 RoutingKey,精确推送到目标节点的专属队列。
优点:精准投递,网络高效,消息只发送到目标节点。
缺点:
路由表依赖,Redis宕机导致系统不可用;
状态不一致,客户端意外断开重连到新节点而路由表未更新——可以通过缓存过期续期那一套解决;
节点1宕机后重启,用户2重新连接到了一个节点2,此前若有等待被发到用户2的消息,那么仍是节点1的专属队列,节点1正要给用户2发送消息时发现本地并没有这个 WebSoket Session,需要重新查询路由表投递给消息队列,导致延时增大;若节点1没恢复,消息则永远不会发送到用户2;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 sequenceDiagram participant C as 客户端A participant N1 as 节点1 participant Redis as 路由表(Redis) participant MQ as 消息队列 participant N2 as 节点2 participant S as 客服B C->>N1: 发送消息 N1->>Redis: 查询目标节点 Redis-->>N1: 返回节点2 N1->>MQ: 转发到节点2队列 MQ->>N2: 投递消息 N2->>S: 推送消息
哈希环 不借助这些队列/订阅机制,通过哈希函数将用户ID映射到 WebSocket 服务器,实现消息的路由和推送。该方案可以有效减少消息广播带来的网络流量,并提高消息传递的效率。 需要通过心跳等机制,去维护节点的存活情况,保证路由到的 WebSocket 服务器节点可用。