参考:

怎么保证mq消息顺序性?保证不了
MQ系列12:如何保证消息顺序性

消息的有序性在很多业务场景中占有很重要的位置。
比如购物场景,需要按照 创建订单 -> 订单付款 -> 完成订单 顺序执行。
又比如出行场景,接单 -> 接送到达目的地 -> 付款 -> 完成订单。
这种是严格按照顺序执行的,这样的顺序消费才不会出问题,而且各个订单之间是互相独立和并行执行的。
所以,在 MQ 中,如何稳定地保证顺序性消息处理,是一个不可避免的话题。

消息的顺序性说明

消息的有序执行,一般不是单个组件的能力。而是整个消息从生产,排队,存储到消费都是有序的,比如上面提到的购物和出行场景。

这就要求我们在消息队列(无论是 Kafka、RocketMQ 还是 RabbitMQ)中,保证以下前提:

  • 消息生产的有序性:即生产者组件有序发送消息。
  • 消息入出队列的有序性:消息需严格按照进入队列的先后顺序入队和出队,遵循 FIFO 原则。
  • 消息的存储的有序性:与上一点一致,部分场景下为提高可靠性需持久化到磁盘,这时候应该遵循有序存放,才能保证后续有序消费。
  • 消息消费的有序性:即按照顺序进行消费。又分为全局顺序消息与局部顺序消息,全局是指 Topic 下的所有消息都要保证顺序;局部顺序消息保证每一组消息被顺序消费即可。

全局有序和局部有序

全局有序是所有消息按发送顺序消费,局部有序是指定业务维度的消息按顺序消费

类型 定义 业务场景 实现成本
全局有序 所有消息严格遵循 “发送顺序 = 消费顺序” 股票交易行情(需全量按时间排序) 极高(仅单队列 + 单消费者)
局部有序 同一业务标识(如订单 ID、用户 ID)的消息按顺序消费 电商订单(下单→支付→发货)、外卖订单(下单→接单→配送) 较低(多队列 + 按业务 ID 路由)

大多数业务无需追求全局有序,只需保证局部有序(如同一订单、同一用户的消息顺序)。

举个例子:
外卖平台无需保证 “用户 A 的下单消息” 和 “用户 B 的下单消息” 的顺序(保证了也没有什么意义,因为两个订单是否全局有序执行不会有任何影响),但必须保证 “用户 A 的下单 → 接单 → 配送” 三条消息按顺序消费。若 “配送” 消息先被处理,会出现 “订单还没被接单就开始配送” 的逻辑错误。

如果想让全局都是顺序性消费,那么只能用一个消费者去消费队列(一般来说也是单个生产者),这是会严重影响整体性能的。

所以接下来的内容主要围绕局部有序这个需求进行。

消息有序性的核心

MQ存储消息的有序性

消息队列(MQ)本身的特性决定了其存储的顺序性能力:

  • 单队列/分区的 FIFO 特性:同一队列/分区的消息存储和投递遵循先进先出原则,MQ 会将同一队列的消息按投递顺序持久化到磁盘
  • 跨队列/分区的无序性:不同队列/分区间的消息可能存在网络延迟差异,MQ本身不保证跨队列/分区的全局顺序

所以按顺序发送到 MQ 单队列的消息,取出的时候也是有序的,不需要我们额外做些什么。
我们只需要保证消息的生产和消费时的顺序性就行了。

消息生产的有序性

要保证整个消息队列的有序性执行,首先要保证消息生产的有序性。
一个完整的过程如果被分配到了不同的队列/分区,这可能是消息乱序的起点。

举个例子:
一次完整的消费过程:创建订单、付款、完成订单
如果这三个消息分别在三个不同的队列,那这三个步骤可以说是并行执行的,很可能 “完成订单” 反而先被消费,发生逻辑错误。

所以我们必须保证一组顺序的消息都存入同一个队列/分区中。

方案如下:
自定义路由算法,让需有序的消息进入同一队列

  • 一个订单的多个子消息的父订单号是一致,我们把这些消息按照订单号取模,投送到对应的 Queue 中就行了,比如 订单号 % 队列数量( 163105015 % 9)
  • 发送消息自定义消息标签(消息标签可以用队列编号命名),一组消息使用同一个标签,该组标签对应的消息都投向标签所在的队列。

我们可以定义多个队列/分区,多个队列同时存在,也意味着可以存在多个消费者。这显然比全局有序需要满足的条件 ”只允许单个队列-单个消费者“ 性能将会更好。

消息消费的有序性

既然消息生产和消息持久化都可以做到有序性。那么只要保证消费的有序性,就能保证整个消息队列的有序执行。

方案如下:
每个队列只能由一个消费者消费

每个 MQ 消息队列只能由一个消费者消费的话,这个消费者最终能拿到MQ消息队列中所有的消息,所以拿到的消息在总体上是有序的(FIFO),可避免多消费者拆分队列导致的顺序混乱。

每个队列只能由一个消费者消费的方案虽能保证顺序,但会导致性能瓶颈 —— 若队列中消息量过大,单个消费者串行处理会造成消息积压。

单消费者效率低的问题也有可行的解决方案:
消费者内部维护多个阻塞队列,把同一业务 ID 消息投递到同一个阻塞队列,每个阻塞队列单线程串行,多个阻塞队列并行处理

注意事项:

  1. 阻塞队列需单线程串行处理,可以绑定阻塞队列和独立线程来实现。通过多个阻塞队列并行的方式达到多线程的效果。(否则这与多个消费者消费同一个 MQ 队列没有什么区别了)
  2. 当然这种方法要注意对 MQ 进行 ACK 的时机,因为要放到阻塞队列里面执行,所以 ACK 机制必须换成手动 ACK。时机是从阻塞队列取出消息并处理成功后。
  3. RabbitMQ 的 prefetch 机制控制的是未 ACK 消息的最大数量(预取数),只要未 ACK 消息数低于 prefetch 上限,即使有未处理完成的消息,消费者也会拉取新消息补充到额度上限,不会等待所有消息处理完。(比如设置 prefetch=5,目前有 3 条同时 ACK,另两条未 ACK,消费者不会等待另外两条消息任务执行完成才向 MQ 一下子获取 5 条新消息,而是 ACK 之后检查未 ACK 最大数量,小于 prefetch 则立即获取新消息补至预取上限)

导致乱序的其他因素

扩容缩容导致乱序

上述的保证局部有序较高性能的方案,是绝对不能随便对队列进行扩容缩容的。
一旦扩容缩容,消息队列数量就变化了,从而导致本该在一个队列的消息被分配到不同的队列。

举个例子:
“下单” 消息已经分配到队列1,在投递 “支付” 消息前,此时发生扩容,可能导致 “支付” 消息被分配到队列2,上述也说了消息如果不在同一个 MQ 消息队列/分区,是不能保证顺序性的。

所以最好是先把队列数量固定。或者把出现乱序的同一业务 ID 的所有消息重新投递到特殊的队列处理(这个队列做好完全的顺序检查,可以重复入队等来保证消息的顺序执行)。或者双写过渡,先同时向新旧队列写入消息,待旧队列消息消费完毕后,再停止向旧队列写入、开启新队列的消费。

Rebalance导致乱序

MQ 的 “Rebalance” 机制(如 Kafka 的消费者组重平衡)会在 “消费者数量变化”(如下线、新增消费者)时,重新分配 “消费者 - 队列” 的绑定关系。若原消费某队列的消费者被分配到新队列,历史消息与新消息会被不同消费者处理,引发乱序。

消息重试导致乱序

消息重试也会破坏顺序性。当消息处理失败或者被拒绝时,MQ 会将消息重新入队(当做新的消息被放到当前队列末尾),若此时重试消息(第一顺序执行的消息)排在新消息(如第二顺序才执行的消息)之后,会导致需要有序消费的消息乱序。

因此消息消费失败只能进行本地重试->死信队列,不能把消息放回到MQ队列重试。

如果采用消费者内部维护阻塞队列的方案,只允许线程内同步重试,处理消息的单线程在当前消息处理失败后,直接在当前上下文循环重试(如最多3次),重试期间不处理阻塞队列的下一条消息。从而避免消息重新入队阻塞队列,打乱阻塞队列中消息的顺序。

兜底措施

最好把 MQ 看作不太可靠的中间件,不要全依靠 MQ 保证有序性和可靠性。

不依靠 MQ 队列实现有序性,就需要在消费消息的时候进行业务检查,如:

  1. 保证消息消费的幂等性;
  2. 检查上一个顺序消息是否被消费完成,没有被消费完成的话本消息可以选择重新投递到 MQ 队列消息/死信队列,也可以先尝试重入消费者内部的阻塞队列。
  3. 开启定时任务检查对应的数据,做好补偿策略等。

上述方法可以作为兜底,全做好了也可以完全不依靠 MQ 来保证有序性和可靠性。但是这种全面检查肯定是需要时间和资源的,在高并发的情况下不太适用。

总结

  1. 自定义路由算法,让需有序的消息进入同一队列
  2. 每个队列只能由一个消费者消费
  3. 消费者内部维护多个阻塞队列,把同一业务 ID 消息投递到同一个阻塞队列,每个阻塞队列单线程串行,多个阻塞队列并行处理