如何保证MQ消息顺序性
参考:
怎么保证mq消息顺序性?保证不了
MQ系列12:如何保证消息顺序性
消息的有序性在很多业务场景中占有很重要的位置。
比如购物场景,需要按照 创建订单 -> 订单付款 -> 完成订单 顺序执行。
又比如出行场景,接单 -> 接送到达目的地 -> 付款 -> 完成订单。
这种是严格按照顺序执行的,这样的顺序消费才不会出问题,而且各个订单之间是互相独立和并行执行的。
所以,在 MQ 中,如何稳定地保证顺序性消息处理,是一个不可避免的话题。
消息的顺序性说明
消息的有序执行,一般不是单个组件的能力。而是整个消息从生产,排队,存储到消费都是有序的,比如上面提到的购物和出行场景。
这就要求我们在消息队列(无论是 Kafka、RocketMQ 还是 RabbitMQ)中,保证以下前提:
- 消息生产的有序性:即生产者组件有序发送消息。
- 消息入出队列的有序性:消息需严格按照进入队列的先后顺序入队和出队,遵循 FIFO 原则。
- 消息的存储的有序性:与上一点一致,部分场景下为提高可靠性需持久化到磁盘,这时候应该遵循有序存放,才能保证后续有序消费。
- 消息消费的有序性:即按照顺序进行消费。又分为全局顺序消息与局部顺序消息,全局是指 Topic 下的所有消息都要保证顺序;局部顺序消息保证每一组消息被顺序消费即可。
全局有序和局部有序
全局有序是所有消息按发送顺序消费,局部有序是指定业务维度的消息按顺序消费
| 类型 | 定义 | 业务场景 | 实现成本 |
|---|---|---|---|
| 全局有序 | 所有消息严格遵循 “发送顺序 = 消费顺序” | 股票交易行情(需全量按时间排序) | 极高(仅单队列 + 单消费者) |
| 局部有序 | 同一业务标识(如订单 ID、用户 ID)的消息按顺序消费 | 电商订单(下单→支付→发货)、外卖订单(下单→接单→配送) | 较低(多队列 + 按业务 ID 路由) |
大多数业务无需追求全局有序,只需保证局部有序(如同一订单、同一用户的消息顺序)。
举个例子:
外卖平台无需保证 “用户 A 的下单消息” 和 “用户 B 的下单消息” 的顺序(保证了也没有什么意义,因为两个订单是否全局有序执行不会有任何影响),但必须保证 “用户 A 的下单 → 接单 → 配送” 三条消息按顺序消费。若 “配送” 消息先被处理,会出现 “订单还没被接单就开始配送” 的逻辑错误。
如果想让全局都是顺序性消费,那么只能用一个消费者去消费队列(一般来说也是单个生产者),这是会严重影响整体性能的。
所以接下来的内容主要围绕局部有序这个需求进行。
消息有序性的核心
MQ存储消息的有序性
消息队列(MQ)本身的特性决定了其存储的顺序性能力:
- 单队列/分区的 FIFO 特性:同一队列/分区的消息存储和投递遵循先进先出原则,MQ 会将同一队列的消息按投递顺序持久化到磁盘
- 跨队列/分区的无序性:不同队列/分区间的消息可能存在网络延迟差异,MQ本身不保证跨队列/分区的全局顺序
所以按顺序发送到 MQ 单队列的消息,取出的时候也是有序的,不需要我们额外做些什么。
我们只需要保证消息的生产和消费时的顺序性就行了。
消息生产的有序性
要保证整个消息队列的有序性执行,首先要保证消息生产的有序性。
一个完整的过程如果被分配到了不同的队列/分区,这可能是消息乱序的起点。
举个例子:
一次完整的消费过程:创建订单、付款、完成订单
如果这三个消息分别在三个不同的队列,那这三个步骤可以说是并行执行的,很可能 “完成订单” 反而先被消费,发生逻辑错误。
所以我们必须保证一组顺序的消息都存入同一个队列/分区中。
方案如下:
自定义路由算法,让需有序的消息进入同一队列
- 一个订单的多个子消息的父订单号是一致,我们把这些消息按照订单号取模,投送到对应的 Queue 中就行了,比如 订单号 % 队列数量( 163105015 % 9)
- 发送消息自定义消息标签(消息标签可以用队列编号命名),一组消息使用同一个标签,该组标签对应的消息都投向标签所在的队列。
我们可以定义多个队列/分区,多个队列同时存在,也意味着可以存在多个消费者。这显然比全局有序需要满足的条件 ”只允许单个队列-单个消费者“ 性能将会更好。
消息消费的有序性
既然消息生产和消息持久化都可以做到有序性。那么只要保证消费的有序性,就能保证整个消息队列的有序执行。
方案如下:
每个队列只能由一个消费者消费
每个 MQ 消息队列只能由一个消费者消费的话,这个消费者最终能拿到MQ消息队列中所有的消息,所以拿到的消息在总体上是有序的(FIFO),可避免多消费者拆分队列导致的顺序混乱。
每个队列只能由一个消费者消费的方案虽能保证顺序,但会导致性能瓶颈 —— 若队列中消息量过大,单个消费者串行处理会造成消息积压。
单消费者效率低的问题也有可行的解决方案:
消费者内部维护多个阻塞队列,把同一业务 ID 消息投递到同一个阻塞队列,每个阻塞队列单线程串行,多个阻塞队列并行处理。
注意事项:
- 阻塞队列需单线程串行处理,可以绑定阻塞队列和独立线程来实现。通过多个阻塞队列并行的方式达到多线程的效果。(否则这与多个消费者消费同一个 MQ 队列没有什么区别了)
- 当然这种方法要注意对 MQ 进行 ACK 的时机,因为要放到阻塞队列里面执行,所以 ACK 机制必须换成手动 ACK。时机是从阻塞队列取出消息并处理成功后。
- RabbitMQ 的
prefetch机制控制的是未 ACK 消息的最大数量(预取数),只要未 ACK 消息数低于prefetch上限,即使有未处理完成的消息,消费者也会拉取新消息补充到额度上限,不会等待所有消息处理完。(比如设置prefetch=5,目前有 3 条同时 ACK,另两条未 ACK,消费者不会等待另外两条消息任务执行完成才向 MQ 一下子获取 5 条新消息,而是 ACK 之后检查未 ACK 最大数量,小于prefetch则立即获取新消息补至预取上限)
导致乱序的其他因素
扩容缩容导致乱序
上述的保证局部有序较高性能的方案,是绝对不能随便对队列进行扩容缩容的。
一旦扩容缩容,消息队列数量就变化了,从而导致本该在一个队列的消息被分配到不同的队列。
举个例子:
“下单” 消息已经分配到队列1,在投递 “支付” 消息前,此时发生扩容,可能导致 “支付” 消息被分配到队列2,上述也说了消息如果不在同一个 MQ 消息队列/分区,是不能保证顺序性的。
所以最好是先把队列数量固定。或者把出现乱序的同一业务 ID 的所有消息重新投递到特殊的队列处理(这个队列做好完全的顺序检查,可以重复入队等来保证消息的顺序执行)。或者双写过渡,先同时向新旧队列写入消息,待旧队列消息消费完毕后,再停止向旧队列写入、开启新队列的消费。
Rebalance导致乱序
MQ 的 “Rebalance” 机制(如 Kafka 的消费者组重平衡)会在 “消费者数量变化”(如下线、新增消费者)时,重新分配 “消费者 - 队列” 的绑定关系。若原消费某队列的消费者被分配到新队列,历史消息与新消息会被不同消费者处理,引发乱序。
消息重试导致乱序
消息重试也会破坏顺序性。当消息处理失败或者被拒绝时,MQ 会将消息重新入队(当做新的消息被放到当前队列末尾),若此时重试消息(第一顺序执行的消息)排在新消息(如第二顺序才执行的消息)之后,会导致需要有序消费的消息乱序。
因此消息消费失败只能进行本地重试->死信队列,不能把消息放回到MQ队列重试。
如果采用消费者内部维护阻塞队列的方案,只允许线程内同步重试,处理消息的单线程在当前消息处理失败后,直接在当前上下文循环重试(如最多3次),重试期间不处理阻塞队列的下一条消息。从而避免消息重新入队阻塞队列,打乱阻塞队列中消息的顺序。
兜底措施
最好把 MQ 看作不太可靠的中间件,不要全依靠 MQ 保证有序性和可靠性。
不依靠 MQ 队列实现有序性,就需要在消费消息的时候进行业务检查,如:
- 保证消息消费的幂等性;
- 检查上一个顺序消息是否被消费完成,没有被消费完成的话本消息可以选择重新投递到 MQ 队列消息/死信队列,也可以先尝试重入消费者内部的阻塞队列。
- 开启定时任务检查对应的数据,做好补偿策略等。
上述方法可以作为兜底,全做好了也可以完全不依靠 MQ 来保证有序性和可靠性。但是这种全面检查肯定是需要时间和资源的,在高并发的情况下不太适用。
总结
- 自定义路由算法,让需有序的消息进入同一队列
- 每个队列只能由一个消费者消费
- 消费者内部维护多个阻塞队列,把同一业务 ID 消息投递到同一个阻塞队列,每个阻塞队列单线程串行,多个阻塞队列并行处理