参考:

IM消息机制(一):保证在线实时消息的可靠投递
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
Telegram 安全方案解析 - 客户端到服务端的加密

分布式WebSocket

常见方案在我介绍分布式 WebSocket 的文章有说。
我偏好选择 Redis 中央路由+消息队列的方式。

使用 Netty 服务器代替 Tomcat,创建一个WebSocket服务器。

X3DH加密

X3DH仅用于单聊的消息,每条消息发送的时候,服务器端记录发送者的公钥(IK,EK),接收者的公钥(IK,SPK,OPK)。

消息的可靠投递

Q:为什么在有TCP的情况下还需要自己实现聊天消息的ACK机制?
A:TCP是“传输可靠”,应用层ACK是“业务可靠”。

TCP的可靠性确保数据从发送端的TCP层到接收端的TCP层的字节流完整、有序、无重复。
但是在IM系统里面光是传输到是没有意义的,TCP确保数据包到达接收端的TCP层,但接收方应用可能崩溃、卡死或处理失败,各种原因没及时保存数据包导致消息再也无法成功展示给用户,永久漏了这条消息。

六报文设计

IM系统的六报文设计是消息可靠投递的核心机制,通过双重确认超时重传确保消息不丢失。

报文分为三种:

  1. 请求报文(request,后简称为为R)
  2. 应答报文(acknowledge,后简称为A)
  3. 通知报文(notify,后简称为N)

msg:R/A/N:确保消息从发送方到接收方的可靠性。
ack:R/A/N:确保接收方已读消息,并通知发送方。

流程

  1. 发送请求(msg:R)- 客户端A发送消息到服务器(IM Server)。
  2. 发送确认(msg:A)- IM Server确认消息已接收,返回ACK给客户端A。
  3. 消息通知(msg:N)- IM Server将消息推送给客户端B(若在线)。
  4. 客户端B确认请求(ack:R)- 客户端B向 IM Server 发送ACK确认请求收到消息。
  5. 服务端确认(ack:A)- IM Server确认客户端B的ACK。
  6. 发送方通知(ack:N)- IM Server通知客户端A消息已送达客户端B。

可以在消息表维护一个status字段,记录目前状态(sent, delivered, read, finished),从而避免显示的ack:N推送。
finish字段用于过滤状态为已读的历史消息,减少网络IO以及没有必要的去重

整个流程大概如下(单聊):

超时与重传

若客户端向服务器发送消息后,没有收到 msg:A,则需要进行一定次数的重传,同时维护重传计数器,如首次超时 1s 重试,第二次 2s,第三次 4s,最大重试 3 次(避免无限重试),超过次数则标记 “发送失败” 并提示用户。

重传消息可能会导致多条相同消息发送,本来你也许只想发送一次该条消息的。那给消息设置一个唯一 ID 则是一个很好的解决方案。
如客户端使用 UUID 作为请求 ID 来跟踪定位这条消息,用于发送方确认是否成功发送以及获取对应的消息 ID,实际的消息 ID 则由服务器端生成(雪花算法 ID)。

消息同步机制

读扩散和写扩散

写扩散(Write Diffusion)

  • 核心逻辑:当一条消息发送到群聊时,将消息复制并存储到每个群成员的个人收件箱中。
  • 形象理解:类似于 “群发邮件”,发件人一次发送,系统自动给每个收件人复制一份存到他们的邮箱。
  • 读写特点
    • 写操作成本高(群成员越多,写入次数越多);
    • 读操作简单高效(用户只需查询自己的收件箱,无需额外计算)。

读扩散(Read Diffusion)

  • 核心逻辑:当一条消息发送到群聊时,只存储一份到群的公共消息列表中。用户读取消息时,再从公共列表中拉取自己有权限查看的消息。
  • 形象理解:类似于 “论坛帖子”,发帖人只发一次到论坛,用户各自去论坛查看。
  • 读写特点
    • 写操作成本低(无论群大小,只写一次);
    • 读操作复杂(需要计算用户可见范围、过滤已读消息等)。

写扩散理论上是每人有自己的信箱,可以使用统一的收件箱存储所有用户的扩散消息。发送者发送消息的时候,还需要额外为接收者在 DB 插入一条记录,每个用户根据自己的 ID 获取相应的扩散信息。

读扩散不需要为接收者额外插入一条记录,只需要记录发送者发出的消息,用户在消息表中使用自己的 ID 进行过滤获取。

单聊下读写扩散相差不大,差别在大群聊天就能体现出来了。
场景:用户A在1000人的群中发一条消息:
写扩散:先存 1 条原始消息到 message 表,再向 1000 个群成员的收件箱各写 1 条关联记录(inbox表需写 1000 条)。
读扩散:服务器只需将消息存储到消息表(1次写入),但是由于消息表可能积累很多历史消息的记录,所以读取最新消息时可能会读取大量历史消息再进行去重过滤比较耗时,但是这个缺点可以使用游标过滤解决。

推送机制

无论群聊还是单人,由于个人偏好我选择读扩散;

强制主动推送的时机是用户建立 WebSocket 长连接后;

Pull

双人聊天均使用服务器主动推送消息;群聊根据群聊大小需选择主动推送还是等待用户 pull。

当群内产生新消息时,服务器不直接通过 WebSocket 推完整消息,而是推一个 “极简信号”,告知客户端 “某个群有新消息,该 Pull 了”。

可以利用 Redis 做一个群聊未读功能,只存储群聊未读消息计数不存储信息,等用户进入聊天会话的时候再开始分页查询。用户发送消息时,给该群所有用户的未读计数器计数+1;——使用LUA
不是给 1000 人大群建 1000 个独立计数器,而是用 “群为 key,用户为 field, 未读消息计数为 value” 的 Hash 结构,1 个群的所有用户计数器存在 1 个 Redis 键里。

在线触发时机:服务器需维护 “用户 - WebSocket 连接” 映射表,同时维护 “群 - 用户” 在线关系表,群消息成功写入数据库后,筛选出 “在群内且当前 WebSocket 在线” 的用户,仅给这些用户触发信号推送;

离线后登陆策略:获取用户所有群的未读计数器,避免一登陆就要等待比较长时间的检索以及大量 ACK:R/A/N 报文导致客户端和服务端出现性能问题。

考虑群聊未读计数器的持久化。

游标

被动推送的方法是 pull + 游标,消息表索引是个联合索引,注意左前缀原则避免索引失效。如果偏移值是 msg_id,那么 msg_id 索引需要放到联合索引最后避免索引失效。

但是使用 msg_id 单个游标可能会在群聊出现漏消息问题,因为不同用户可能在多个节点生成多个雪花 ID,较小的那个因为网络等问题而导致最后才入库,而此时用户已经更新了游标,导致较早生成(ID较小)但是落库较晚的消息被跳过,并且永远无法被感知读取也是有可能的。
这种情况可以使用复合游标去解决,使用(宽松时间戳,msg_id)作为游标更加安全。Pull 时使用created_at > ? OR (created_at = ? AND msg_id > ?) 去查询,时间戳优先,ID兜底。

仅使用纯时间戳作为游标也有点问题,Pull 是 “即时查询”,同一时间戳有多条消息可能出现楼消息的情况:
消息 X:ID=101(先落库);
消息 Y:ID=102(中间落库);
消息 Z:ID=103(因节点压力延迟 0.5 秒落库)
此时用户读到Y就返回了,下次查询时使用 created_at > ? 来查询就会漏掉Z。

时间戳建议用MySQL函数生成来保证全局一致性
宽松时间戳是指上条消息的时间戳减去n秒(向前多取1秒)

消息分片

某帖子看到的,也许也可以考虑:
超大群的消息可以采用消息分片,服务器端将群消息按 “时间片” 分片存储到缓存(如每5min一个分片,避免大key问题)。
群消息落库后,同步写入 Redis 缓存,避免每次拉取都查底层数据库。如果客户端的游标时间戳(最后一条消息的时间)在这个分片之前,那就到数据库去拉去;否则先从 Redis 获取到分片的所有消息,再根据复合游标过滤已接收的消息再发送给用户端。