参考:

IM消息机制(一):保证在线实时消息的可靠投递
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
Telegram 安全方案解析 - 客户端到服务端的加密

分布式 WebSocket

IM 系统对时效要求比较高,轮询方案虽然能实现消息获取,但会把实时性建立在大量重复请求上,代价并不划算。WebSocket 在握手之后保持长连接,更适合这种服务端主动推送的场景。

这里选择 Netty 而不是 Tomcat,主要出于考虑:

  1. 长连接接入层可以单独控制,线程模型更清晰
  2. WebSocket 接入、业务处理、跨节点转发更容易拆开

另外,Netty 的 Reactor 线程模型天然将 I/O 处理与业务线程分离,比 Tomcat 的 Servlet 线程模型更适合大量 WebSocket 长连接场景。若配合虚拟线程,业务处理部分的并发吞吐还能进一步提升。

分布式 WebSocket 的相关文章我也有介绍,这里省略点吧。

接入架构

整体的架构可能是这样:

1
2
3
4
5
6
Client  
-> Gateway
-> Netty WebSocket 节点
-> Redis 路由
-> MQ 跨节点转发
-> MySQL 持久化

接入节点时尽量只做轻量工作,例如:

  • 连接建立
  • 身份认证
  • 心跳处理
  • 连接上下文绑定
  • 基础报文编解码

数据库访问、关系校验、消息写入和跨节点转发,不适合放在 EventLoop 线程里直接完成,可以选择线程池或者虚拟线程。

Redis 路由模型

分布式场景下,服务端必须知道某个用户、某台设备当前挂在哪个节点上。
因此,Redis 中保存的就不该只是一个“在线标志”,而应当是可用于路由判断的连接信息

当前实现是基于 sessionId 管连接,所以这部分更贴切的表达应该是:

1
2
3
4
5
6
ws:user:devices:{userId} -> Set(deviceId)

ws:route:{userId}:{deviceId} -> {
nodeId,
sessionId
}

这里的 sessionId 是当前连接的唯一标识。
同一设备重新登录时,会生成新的 sessionId,并覆盖 Redis 中对应的路由记录。

sessionId 的作用

如果系统支持多设备在线,同时还要处理同一设备重复登录的情况,sessionId 就很有用。

原因在于,下面几步不是原子完成的:

  • 新连接建立
  • Redis 路由覆盖
  • 旧节点收到下线通知
  • 旧连接被清理

若旧节点只按 userId + deviceId 粗略判断,很容易在通知迟到时误伤新连接。
而有了 sessionId 之后,旧节点在关闭连接前,只需要先比较:

  • 本地连接的 sessionId
  • Redis 当前保存的 sessionId

只有两者仍然对应同一条旧连接,才执行关闭。这样处理,跨节点顶号会更加合理。

路由覆盖与清理

如果同一设备重复登录,新连接会生成新的 sessionId,并覆盖旧路由。
旧节点收到下线通知后,不应该只按 userIddeviceId 去清理连接,而应当带上目标 sessionId 一起判断。

更好一点的处理方式如下:

1
2
if local.sessionId == targetSessionId:  
close()

这样可以避免迟到的通知误伤后建立的新连接。

消息加密

单聊场景下,可以使用 X3DH 完成初始密钥协商进行端对端加密。
这里需要特别说明:X3DH 更适合放在会话建立阶段,而不是每条消息发送时都重新执行一次。

比较常见的做法是:

  • 用户长期持有身份密钥 IK
  • 接收方在服务端发布预密钥包,例如 IKSPKOPK
  • 发送方首次发起会话时临时生成 EK
  • 双方基于这些公钥材料协商出初始共享密钥

服务端保存的是公钥材料,不是私钥。
后续逐条消息的加密,通常应交给会话密钥链继续推进,而不是反复重新走 X3DH。

群聊的场景使用对称加密也许是更好的选择。

可靠投递

不可靠的可能性

TCP 解决的是传输层的字节流可靠性,但 IM 系统关心的是业务层的消息可靠性

也就是说,TCP 可以保证数据到达对端 TCP 层,却不能保证:

  • 接收端应用已成功处理该消息
  • 消息已进入本地存储
  • 客户端重连后还能继续恢复这条消息

所以,IM 系统仍然需要自己的业务确认机制,如经典的六报文确认机制。

六报文设计

IM 系统的六报文设计是消息可靠投递的核心机制

报文分为三种:

  1. 请求报文(request,后简称为为 R)
  2. 应答报文(acknowledge,后简称为 A)
  3. 通知报文(notify,后简称为 N)

msg:R/A/N:确保消息从发送方到接收方的可靠性。
ack:R/A/N:确保接收方已读消息,并通知发送方。

流程

  1. 发送请求(msg:R)- 客户端 A 发送消息到服务器(IM Server)。
  2. 发送确认(msg:A)- IM Server 确认消息已接收,返回 ACK 给客户端 A。
  3. 消息通知(msg:N)- IM Server 将消息推送给客户端 B(若在线)。
  4. 客户端 B 确认请求(ack:R)- 客户端 B 向 IM Server 发送 ACK 确认请求收到消息。
  5. 服务端确认(ack:A)- IM Server 确认客户端 B 的 ACK。
  6. 发送方通知(ack:N)- IM Server 通知客户端 A 消息已送达客户端 B。

在实践中,当用户客户端设备均不在线时,msg 和 ack 的 N 采用客户端 pull 来完成,以替代服务端的推送队列。

另外,“到达对端”这个通知一般上说作用其实并没有这么大,像是 TG、飞书之类的几乎没用到这个报文完成一些功能,所以可以考虑按分层来精简降低复杂度:

  • 基础可靠:msg:R + msg:A
  • 投递确认:msg:N(可选)
  • 已读:ack:R/A/N

发送端幂等

发送端应为每条消息带一个本地生成的唯一标识,例如 clientMsgId

1
2
3
4
5
6
{  
"clientMsgId": "2f88d6d4-7c30-4c52-9e65-1d0b2a1c5a12",
"conversationId": 10001,
"senderId": 20001,
"content": "hello"
}

服务端收到后,先根据 clientMsgId 做幂等判断:

  • 已处理:直接返回已有结果
  • 未处理:继续分配消息 ID、写入数据库、推进状态

服务端自己的消息主键可以继续使用雪花 ID。
这样,本地请求标识和服务端消息标识各司其职,不会混在一起。

超时与重传

发送端如果在一段时间内收不到 response,可以重试。
比较常见的策略如下:

首次超时:1s
第二次:2s
第三次:4s
最大重试次数:3

达到上限后,可以把消息标记为发送失败,并提示用户。
真正重要的不是重试时间本身,而是重试时必须复用同一个 clientMsgId,否则重发会被服务端当成新消息处理。

多设备与在线状态

在线判定

在多设备场景下,设备在线和用户在线不是同一层概念:

  • 设备在线:该设备当前存在有效 WebSocket 路由
  • 用户在线:该用户至少有一台设备在线

因此,判断某个用户是否在线,不能只看单条记录,而应当先取出该用户所有设备,再看是否至少存在一条有效路由。

心跳机制

设备在线状态可以通过心跳维护。
例如:

心跳间隔:60s
过期时间:90s

这种配置的目的,不是追求毫秒级精确,而是在两件事之间做平衡:

  • 状态修正不能太慢
  • 网络抖动不能轻易把设备误判成离线

可以考虑在每一次接发消息时顺带去更新心跳,减少部分网络 IO。

在线状态与路由状态

设备只要完成 WebSocket 注册,并且心跳未过期,就可以视为在线。
账号在线则是基于设备在线再向上一层聚合的结果,也就是只要任一设备在线,该用户就视为在线。

这里不再额外做一套复杂的在线判定结构,直接围绕设备路由和过期时间去判断即可。

同步模型

读扩散与写扩散

群聊同步通常绕不开读扩散和写扩散的取舍。

写扩散

写扩散是把一条群消息复制给每个成员的个人收件箱。
优点是读取简单,缺点是写入成本高。群越大,写放大越明显。

如果做成收件箱模型,发送者发出一条消息后,除了在消息表里落一条原始消息,还需要为每个接收者额外写一条关联记录。单聊下这个成本还好,大群里就会比较明显。

读扩散

读扩散只保存一份群消息,用户读取时再按会话去拉。
优点是写入轻,缺点是读取和状态计算更复杂。

我更倾向读扩散,单聊下读写扩散相差不大,差别在大群聊天就能体现出来了。
场景:用户 A 在 1000 人的群中发一条消息:
写扩散:先存 1 条原始消息到 message 表,再向 1000 个群成员的收件箱各写 1 条关联记录(inbox 表需写 1000 条)。
读扩散:服务器只需将消息存储到消息表(1 次写入),但是由于消息表可能积累很多历史消息的记录,所以读取最新消息时可能会读取大量历史消息再进行去重过滤比较耗时,但是这个缺点可以使用条件过滤解决。

会话级同步

IM 系统里的消息同步,麻烦的地方不只是把消息查出来。真正要处理的是几个边界:

  • 客户端断线后不能漏消息
  • 同一会话内消息顺序不能乱
  • 用户多端登录时,要能从同一个位置继续同步
  • 在线推送和离线补拉要使用同一套位置语义

如果只依赖消息 ID 或时间戳作为拉取游标,这些问题会变得难处理。更稳妥的做法,是给每个会话维护一条独立递增的序列号,也就是 seq。

一条消息写入某个会话时,服务端为它分配该会话内的下一个 seq。这个 seq 只在当前 conversationId 下有意义,不要求全局连续。这样一来,客户端同步消息时,不需要关心消息在哪个节点生成,也不需要依赖服务端时间,只要记住自己在这个会话中已经同步到哪个 seq 即可。

1
2
3
4
5
6
select *
from message
where conversation_id = ?
and seq > ?
order by seq asc
limit ?

这条查询对应的语义很直接:从某个会话里,取出客户端尚未同步的后续消息。只要 seq 的分配没有重复,也没有倒退,客户端就可以按区间补拉,服务端也能保证返回顺序。

会话状态

围绕 seq,服务端至少需要维护几类状态:

  • lastSeq:当前会话已经产生的最大 seq
  • readSeq:某个用户在该会话中已经读到的位置
  • localSeq:客户端本地已经同步到的位置

其中,lastSeq 属于会话维度,readSeq 属于用户和会话的关系维度,localSeq 则是客户端本地状态。

单聊和群聊都可以套用这一模型。区别在于,群聊还需要结合成员关系判断用户能看到哪些消息,以及哪些在线用户需要被投递。

未读数也可以从这里计算出来:
$unread = lastSeq - readSeq$

这种方式比单独维护未读计数器更不容易出现状态偏差。只要 readSeq 推进正确,未读数就是一个派生结果,而不是另一份需要额外同步维护的数据。

为什么不用 msg_id 做游标

服务端消息主键可以继续使用雪花 ID 或数据库自增 ID,但它不适合作为会话同步游标。

原因在于,消息 ID 通常表达的是全局生成顺序,而不是某个会话内的写入顺序。分布式环境下,不同节点生成 ID、写库、投递的时间并不完全一致。较早生成的消息,可能因为节点负载、网络抖动或事务等待而较晚入库。

如果客户端已经把游标推进到一个更大的消息 ID,后面再落库的一条较小 ID 消息,就可能被跳过。对于 IM 来说,这类问题很难接受,因为用户看到的结果就是消息丢了。

conversationId + seq 的含义更窄,也更适合消息同步。它只要求在同一个会话内递增,不试图表达全局顺序。IM 客户端真正关心的,通常也是某个会话里的消息顺序,而不是全站所有消息的先后关系。

为什么不用时间戳做游标

时间戳也有类似问题。(时间戳由服务端生成和精度而产生的问题)

如果客户端用下面这种条件拉消息:

1
2
3
4
5
6
select *
from message
where conversation_id = ?
and created_at > ?
order by created_at asc
limit ?

边界处很容易出错。多条消息可能拥有相同的时间戳;不同应用节点的时钟也可能存在偏差;某条消息生成得早,但因为写库延迟,实际落库时间晚于客户端上一次同步。

即使把查询改成 created_at + msg_id 的组合游标,也只是缓解问题,不能从根上解决会话内顺序的定义问题。因为时间戳描述的是某个时间点,而不是会话中的第几条消息。

seq 则不同。它直接把顺序变成了会话内的编号,与消息是一对一的关系。客户端同步到 seq = 100,下一次就从 seq > 100 开始拉。这个模型更贴近 IM 的业务语义。

seq 生成方案

seq 的关键不是形式,而是分配方式。对同一个 conversationId,服务端必须保证 seq 单调递增且不重复。

常见做法有几种:

  • 使用 Redis INCR 为每个会话生成递增序号
  • 在数据库的会话表中维护 last_seq,通过行锁或事务推进
  • 使用单独的序列号服务,为会话分配 seq

如果系统规模不大,数据库事务方案更容易保证一致性。消息写入时,先锁定会话记录,推进 last_seq,再把消息和对应 seq 一起写入消息表。缺点是热点群聊会给数据库带来压力。

Redis INCR 性能更好,但要处理 Redis 成功递增后消息写库失败的情况。此时可能出现 seq 空洞。一般来说,seq 不一定要求绝对连续,但客户端拉取逻辑必须能接受中间缺号。如果业务强依赖连续序号,就需要额外的事务补偿或写入状态表。

推送与拉取

消息分发不适合只依赖单一的 Push 或 Pull。更常见的方式,是把链路拆成两部分:

  • 在线链路:用户在线时,服务端直接 push 新消息
  • 补偿链路:用户上线、重连或发现消息缺口时,客户端主动 pull 缺失消息

在线 push 负责实时性,离线 pull 负责可靠性。两条链路使用同一套 conversationId + seq 语义,客户端只需要维护自己在每个会话中的同步位置。

用户在线时,服务端可以直接推送新消息。客户端收到后,如果发现新消息的 seq 正好等于本地 localSeq + 1,就可以直接展示,并推进本地同步位置。

如果收到的 seq 大于 localSeq + 1,说明中间有消息缺失。此时不应该直接把这条消息插入正常时间线,而应先触发一次补拉,把缺失区间补齐后,再按 seq 排序展示。

这个处理可以覆盖网络抖动、跨节点推送延迟、客户端短暂断线等情况。服务端 push 不需要承担所有可靠性问题,客户端也不需要定时全量拉取历史消息。

上线与重连同步

用户上线、重连或切换设备后,不适合让服务端把所有历史消息重新 push 一遍。更合适的方式,是服务端先返回每个会话的位置信息,例如 lastSeq 和用户自己的 readSeq,客户端再根据本地 localSeq 判断是否需要补拉。

如果客户端发现某个会话的 localSeq 小于服务端返回的 lastSeq,说明本地还有消息没有同步。此时客户端带上 conversationId + localSeq 请求服务端,服务端返回该会话中 seq > localSeq 的消息。

1
conversationId + localSeq -> pull messages where seq > localSeq

这个流程本质上不是重新同步全部历史,而是从客户端已知的位置继续往后拉。无论是断线重连,还是多端登录,只要客户端和服务端都围绕同一个 seq 工作,就可以从明确的位置继续同步。

群聊与单聊

单聊和群聊在客户端同步模型上没有本质区别,都是围绕 conversationId + seq 拉取消息。

差别主要在服务端投递过程。

单聊只需要定位接收方设备,再判断用户是否在线。群聊则需要先根据成员关系筛选可见用户,再判断哪些用户在线,最后按用户所在节点进行分发。

无论是单聊还是群聊,消息一旦写入会话,就应该先获得该会话内的 seq。后续 push、pull、未读计算,都基于这个 seq 展开。

缓存分片

曾经我考虑过消息缓存的问题:对于成员很大的群,这几万人都去获取群里相同的一条消息,是否需要把最近一段时间的群消息写入 Redis 缓存分片呢?如:

  • 每 5 分钟一个分片
  • 消息落库后同步写入当前分片
  • 客户端优先查最近分片
  • 超出窗口后再回数据库

其实如果是群成员在线情况更多的话,其实这个优化也没太大作用,因为对于在线用户服务端都会主动去 push 新消息,这是避免不了的资源消耗…

这个想法可能对断线后拉新消息的群成员更多的情况有点帮助,但可能也不多。因为你要考虑这个缓存分片多久过期,过期前是否会有较多的用户去 pull 群消息。如果这两个问题没想清楚的话,这个优化也许本身没啥作用。

总结

把上面几块拼起来,一个相对可行的方案大概是这样:

  • Netty 负责 WebSocket 接入
  • Redis 保存用户、设备、节点与 sessionId 路由
  • 消息加密:单聊使用 X3DH 加密,群聊使用对称加密
  • 消息可靠性依赖应用层确认,而不是只依赖 TCP
  • 单聊、群聊统一成会话级 seq 同步,未读数由 lastSeq - readSeq 计算,不再额外维护未读计数器
  • 用户在线时,服务端直接 push 消息
  • 用户断开连接重连或刚上线时,客户端根据 lastSeq/readSeq 主动 pull 补齐历史消息

对于 IM 系统的设计上,我的个人理解时要专注于消息的可靠性方面,必须做到不漏消息、不乱序、离线后可正确拉取历史消息。之后可能就是多端登陆、顶号下线之类的功能,当然消息加密方面我个人觉得这也是不可忽视的一环。