几种 MQ 顺序消息的实现方式

本文最后更新于:2021年11月27日 下午

顺序消息的必要性

消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。
例如:通过 mysql binlog 进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入 -> 更新 -> 删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除 -> 插入 -> 更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
又比如:交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款。

Kafka

基本概念

Topic

Topic[1]:Kafka 对消息进行归类,发送到集群的每一条消息都要指定一个 topic。

Partition

物理上的概念,每个 topic 包含一个或多个 partition,一个 partition 对应一个文件夹,这个文件夹下存储 partition 的数据和索引文件,每个 partition 内部是有序的。

Topic & Partition

kafka_groups

一个 topic 为一类消息,每条消息必须指定一个 topic。物理上,一个 topic 分成一个或多个 partition,每个 partition 有多个副本分布在不同的 broker 中,如下图。

kafka-broker-partition-replication

每个 partition 在存储层面是一个 append log 文件,发布到此 partition 的消息会追加到 log 文件的尾部,为顺序写入磁盘。每条消息在 log 文件中的位置称为 offset(偏移量),offset 为一个 long 型数字,唯一标记一条消息。如下图:

kafka partition offset

每个消费者唯一保存的元数据是 offset 值,这个位置完全为消费者控制,因此消费者可以采用任何顺序来消费记录,如上图。

顺序消息实践

Kafka 对消息顺序的保障

Kafka 会在同一个 partition 内保障消息顺序,如果 Topic 存在多个 partition 则无法确保全局顺序。如果需要保障全局顺序,则需要控制 partition 数量为 1 个。

Kafka 顺序消息的限制
  • 从生产者的角度来看,向不同的 partition 写入是完全并行的;从消费者的角度来看,并发数完全取决于 partition 的数量(如果 consumer 数量大于 partition 数量,则必有 consumer 闲置)。因此配置多个 partition 数量对于发挥 Kafka 并发的性能十分重要。因为顺序消息只能配置单个 partition,所以其并发性能提升也是比较困难的。
  • 单个 partition 只能被同消费者组的 单个消费者 进程消费。
  • 单个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。

RabbitMQ

基本概念

Queue

Queue(队列)[2]是具有两个主要操作的 顺序数据结构: 一个项目可以在尾部入队(添加),从头部出队(消费)。队列在消息传递技术领域扮演着重要的角色: 许多消息传递协议和工具都假定发布者和消费者使用队列类存储机制进行通信。

RabbitMQ 中的队列是 FIFO(先进先出)。一些队列特性,即消费者的优先级和重新排队,会影响消费者所观察到的排序。

顺序消息实践

RabbitMQ 中的 queue 是有序的消息集合。消息以 FIFO 方式进行排队和出队列(交付给消费者)。

FIFO 排序不保证优先级(priority)队列和分片队列(sharded queues)。所以,只要配置普通 queue,不要配置优先级队列和分片队列,那么队列中的消息就是顺序消息。

多个相互竞争的消费者(consumers)、消费者优先级(consumer priorites)、消息重新传递(message redeliveries)也会影响排序。所以,如果要顺序消费消息,只能有一个 Consumer。

总结一下,要实现 RabbitMQ 顺序消息,配置一个 Queue 对应一个 Consumer,把需要保证顺序的 message 都发送到这一个 Queue 当中,关闭 autoackprefetchCount=1,每次只消费一条信息,处理过后进行手工 ack,然后接收下一条 message,只由一个 Consumer 进行处理。

RocketMQ

基本概念

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过 同一个 消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的[3]

严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

顺序消息实践

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息 只依次发送到同一个 queue 中 ,消费的时候 只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个 OrderId 获取到的肯定是同一个队列。

顺序消息生产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* Producer,发送顺序消息
*/
public class Producer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // 根据订单 id 选择发送 queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());// 订单 id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}

producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 推送 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

return orderList;
}
}

顺序消息消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* 顺序消息消费,带事务方式(应用可控制 Offset 什么时候提交)
*/
public class ConsumerInOrder {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费 <br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个 queue 有唯一的 consume 线程来消费, 订单对每个 queue(分区) 有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}

try {
// 模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

顺序消息消费建议

消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。

Apache Pulsar

概念

Topic 与分区

Topic(主题)[4]是某一种分类的名字,消息在 Topic 中可以被存储和发布。生产者往 Topic 中写消息,消费者从 Topic 中读消息。

Pulsar 的 Topic 分为 Partitioned Topic 和 Non-Partitioned Topic 两类,Non-Partitioned Topic 可以理解为一个分区数为 1 的 Topic。实际上在 Pulsar 中,Topic 是一个虚拟的概念,创建一个 3 分区的 Topic,实际上是创建了 3 个「分区 Topic」,发给这个 Topic 的消息会被发往这个 Topic 对应的多个 「分区 Topic」。
例如:生产者发送消息给一个分区数为 3,名为 my-topic 的 Topic,在数据流向上是均匀或者按一定规则(如果指定了 key)发送给了 my-topic-partition-0my-topic-partition-1my-topic-partition-2 三个「分区 Topic」。

分区 Topic 做数据持久化时,分区是逻辑上的概念,实际存储的单位是分片(Segment)。

如下图所示,分区 Topic1-Part2 的数据由 N 个 Segment 组成, 每个 Segment 均匀分布并存储在 Apache BookKeeper 群集中的多个 Bookie 节点中, 每个 Segment 具有 3 个副本。

Pulsar topic partition 和 segment

消息类型

在消息队列中,根据消息的特性及使用场景,可以将消息作如下分类:

消息类型 消费顺序 性能 适用场景
普通消息 无顺序 最好 吞吐量巨大,且对生产和消费顺序无要求
局部顺序消息 同一分区下所有消息遵循先入先出(FIFO)规则 较好 吞吐量较大,同一分区内有序,不同分区内无序
全局顺序消息 同一 Topic 下所有消息遵循先入先出(FIFO)规则 一般 吞吐量一般,全局有序,单分区
死信消息 - - 无法正常消费的消息

普通消息

普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率,在吞吐量巨大时其性能最好。

局部顺序消息

局部顺序消息相较于普通消息类型,多了一个局部有顺序的特性。即同一个分区下,其消费者在消费消息的时候,严格按照生产者投递到该分区的顺序进行消费。局部顺序消息在保证了一定顺序性的同时,保留了分区机制提升性能。但局部顺序消息不能保证不同分区之间的顺序。

全局顺序消息

全局顺序消息最大的特性就在于,严格保证消息是按照生产者投递的顺序来消费的。所以其使用的是单分区来处理消息,用户不可自定义分区数,相比前两种消息类型,这种类型消息的性能较低。

死信消息

死信消息是指无法被正常消费的消息。TDMQ Pulsar 版会在创建新的订阅(消费者确定了与某个 Topic 的订阅关系)时自动创建一个死信队列用于处理这种消息。

顺序消息实践

Producer:

发送者保证消息的顺序性其实是比较简单的:

一种选择:利用 单队列 发送:

  • 一个业务对应一个队列
  • 一个队列只能由一个消费者监听消费

另一种选择:利用 Pulsar 的分区 Topic

  • producer 发送消息时需要指定 key 属性(如 key 为订单号),Pulsar 自动会根据 Key 值将消息分配到指定的分区中
  • 支持多个消费者消费,多个消费者可以监听同一个分区,但是相同的 Key 只会分配给同一个消费者

Consumer:

消费者保证消息的顺序性有下面两种思路:

一种是:单线程执行

  • 单线程执行保证了消费的顺序性
  • 消费效率低

另一种是:并发消费

  • Producer 发送的消息体中,需指定 key,消费者根据分区、key 定位到对应的线程.

参考资料


几种 MQ 顺序消息的实现方式
https://ewhisper.cn/posts/60020/
作者
东风微鸣
发布于
2021年10月1日
许可协议