抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ消费消息

前言

如果横向比较RocketMQ中的各功能模块哪个最复杂,消息消费管理模块无疑是胜者。

导致消息消费管理模块复杂度高的原因主要有以下几点:

  1. RocketMQ支持在线扩容和缩容,Broker或Consumer的数量变化后需要进行动态负载均衡。
  2. RocketMQ支持三个不同维度上进行分类的消费模式:广播消费/集群消费;拉消息/推消息(长轮询);顺序消费/并行消费。
  3. 为了优化性能,消息消费处理过程中引入了比较多的异步并行处理。
  4. 为了实现“至少一次送达”的消息交付策略,需要对消费失败的消息进行重消费处理。

如果不另外附加说明,本文的所有内容都是基于最常用的集群消费+推消息+并行消费模式

消息消费的含义

在RoketMQ中大家通常所说的“消费”是两个步骤的统称,这两个步骤是:

  1. Consumer从Broker拉取消息到本地,并保存到本地的消息缓存队列(ProcessQueue)。这个步骤中,消费的主体是RocketMQ的Consumer模块。

  2. Consumer从本地的消息缓存队列取出消息,并调用上层应用程序指定的回调函数对消息进行处理。这个步骤中,消费的主体是上层应用程序。

不论是拉消息还是推消息模式,底层的实现都是由Consumer从Broker拉取消息。

在后文中我们分别使用“拉取“和”消费“表示这两个步骤。

幂等,幂等,幂等!

应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样。这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。

“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系。要做到不丢消息就无法避免消息重复消费。原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误。从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错。为了确保不丢消息,重发消息是唯一的选择。

​ 有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。

消息消费模式

从不同的维度划分,Consumer支持以下消费模式:

  • 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;

  • 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。

集群消费

消息队列 RocketMQ 是基于发布/订阅模型的消息系统,消息的订阅方订阅关注的 Topic,以获取并消费消息,由于订阅方应用一般是分布式系统,以集群方式部署有多台机器,因此消息队列 RocketMQ 约定以下概念。

集群消费(CLUSTERING)

使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点

​ 可以理解为同组公共消费,公共资源我拿了你就没有,即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这些消息,但我消费了这条消费你就不会再消费,消费者默认是集群消费方式,适用于大部分消息业务。

适用场景&注意事项
  • 消费端集群化部署, 每条消息只需要被处理一次。
  • 由于消费进度在服务端维护, 可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ClusterConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
广播消费模式(BROADCASTING)

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

​ 可以理解为同组各自消费,即同一 Topic 下,同一消息会被多个实例各自都消费一次,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次 。所以,广播消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。

适用场景&注意事项
  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
使用集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

使用集群模式模拟广播

适用场景&注意事项
  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。

消息消费模式

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

  • 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
  • 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。

数据交互有两种模式:Push(推模式)、Pull(拉模式)。真正的PUSH和PULL的区别:

推模式(PUSH)

我们上面使用的消费者都是PUSH模式,也是最常用的消费模式

​ 由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

原理

​ 原理是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。

​ 所以 RocketMQ 是通过“长轮询” 的方式, 同时通过 Client 端和 Server 端的配合, 达到既拥有 Pull 的优点, 又能达到确保实时性的目的。

实现方式

代码上使用 DefaultMQPushConsumer

​ consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。

代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
优缺点
优点

​ 有消息就推给消费者。延迟小,几乎可以做到实时

缺点

Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端;

  • 加大Server端的工作量,进而影响Server的性能,

  • Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。

  • 有的消费者机器配置好处理能力强,有的配置低处理能力低,但是server推相同数量级消息给消费者,就会导致消费者强的等待,弱的处理效率跟不上,从而导致崩溃。

  • server资源相比消费者的资源肯定是更宝贵

  • 总结下就是客户端慢消费(设计到io等耗时操作)时会放大缺点。

拉模式(PULL)

RocketMQ的PUSH模式是由PULL模式来实现的

​ 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

原理

​ 原理是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

​ 拉模式中,为了保证消息消费的实时性,采取了长轮询消息服务器拉取消息的方式。每隔一定时间,客户端想服务端发起一次请求,服务端有数据就返回数据,服务端如果此时没有数据,保持连接。等到有数据返回(相当于一种push),或者超时返回。

​ 长轮询Pull的好处就是可以减少无效请求,保证消息的实时性,又不会造成客户端积压,目前绝大部分的MQ都是基于的PULL模式

DefaultMQPullConsumer方式
注意

注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer

DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

image-20201216161824696

使用方式

使用方式类似,但是更加复杂,除了像推模式一样需要设置各种参数之外,还需要处理额外三件事情:

  • 获取 MessageQueues 并遍历(一个 Topic 包括多个 MessageQueue),如果是特殊情况,也可以选择指定的 MessageQueue 来读取消息。
  • 维护 Offsetstore,从一个 MessageQueue 里拉取消息时,要传入 Offset 参数,随着不断的读取消息,Offset 会不断增长。这个时候就需要用户把 Offset存储起来,根据实际的情况存入内存、 写入磁盘或者数据库中。
  • 根据不同的消息状态做不同的处理。
实现方式

代码上使用 DefaultMQPullConsumer

​ 取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class PullConsumer {

// 记录每个队列的消费进度
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();

// 获取Topic的所有队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");

//遍历所有队列
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
//拉取消息,arg1=消息队列,arg2=tag消息过滤,arg3=消息队列,arg4=一次最大拉去消息数量
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, "*", getMessageQueueOffset(mq), 32);
//从consumer中获取消息
List<MessageExt> list = pullResult.getMsgFoundList();

//消息处理
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
//将消息放入hash表中,存储该队列的消费进度
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND: // 找到消息,输出
System.out.println(pullResult.getMsgFoundList().get(0));
break;
case NO_MATCHED_MSG: // 没有匹配tag的消息
System.out.println("无匹配消息");
break;
case NO_NEW_MSG: // 该队列没有新消息,消费offset=最大offset
System.out.println("没有新消息");
break SINGLE_MQ; // 跳出该队列遍历
case OFFSET_ILLEGAL: // offset不合法
System.out.println("Offset不合法");
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

//关闭Consumer
consumer.shutdown();
}

/**
* 从Hash表中获取当前队列的消费offset
*
* @param mq 消息队列
* @return long类型 offset
*/
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;

return 0;
}

/**
* 将消费进度更新到Hash表
*
* @param mq 消息队列
* @param offset offset
*/
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
DefaultLitePullConsumer方式

该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理=

使用方式方式

代码上使用 DefaultLitePullConsumer

​ 取消息的过程需要不需要用户写,调用consumer.poll()就可以获取消息,处理完之后调用consumer.commitSync()提交偏移量即可

实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NewPullConsumer {

public static void main(String[] args) throws MQClientException {
// 创建DefaultMQPullConsumer实例
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();

try {
//循环开始消费消息
while (true) {
//从consumer中获取消息
List<MessageExt> list = consumer.poll();
//消息处理
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
//提交偏移量
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
consumer.shutdown();
}
}
}
优缺点
优点

​ 对比push优点就是消费者可以根据自己能力拉取消息处理

缺点

Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定:

  • 间隔太短就处在一个“忙等”的状态,循环空拉取造成资源浪费,浪费资源;
  • 每个Pull的时间间隔太长,Server端有消息到来有可能没有被及时处理,就会增加消息消费的延迟,影响业务使用;
  • 另外需要Client拉取消息时维护offset,代码比较麻烦。

长轮询机制

上面简要说明了Push和Pull两种消息消费方式的概念和各自特点。如果长时间没有消息,而消费者端又不停的发送Pull请求不就会导致RocketMQ中Broker端负载很高吗?那么在RocketMQ中如何解决以做到高效的消息消费呢?

​ RocketMQ的消息消费方式,采用了“长轮询”方式,兼具了Push和Pull的有点,不过需要Server和Client的配合才能够实现。

​ 即Client发送消息请求,Server端接受请求,如果发现Server队列里没有新消息,Server端不立即返回,而是持有这个请求一段时间(通过设置超时时间来实现),在这段时间内轮询Server队列内是否有新的消息,如果有新消息,就利用现有的连接返回消息给消费者;如果这段时间内没有新消息进入队列,则返回空。

​ 这样消费消息的主动权既保留在Client端,也不会出现Server积压大量消息后,短时间内推送给Client大量消息使client因为性能问题出现消费不及时的情况。

原理

​ 通过研究源码可知,RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:

  • 消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中);
  • 然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量
  • 如果条件成立则说明有新消息达到Broker端(这里,在RocketMQ的Broker端会有一个后台独立线程—ReputMessageService不停地构建ConsumeQueue/IndexFile数据,同时取出hold住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s)。
优缺点

​ 长轮询的弊端:在持有消费者请求的这段时间,占用了系统资源,因此长轮询适合客户端连接数可控的业务场景中。

批量消费

​ 消息队列RocketMQ版接收到生产者发送地消息后,不需要分开一条一条的推送给消费者,可以先将消息进行缓存,等攒够指定数量的消息或等待指定的时长后统一将缓存的这些消息推送给消费者进行批量消费。

配置方式

消息缓存的数量和等待时间分别由ConsumeMessageBatchMaxSizeBatchConsumeMaxAwaitDurationInSeconds参数控制。

  • ConsumeMessageBatchMaxSize:批量消费的最大消息数量,缓存的消息数量达到参数设置的值,消息队列RocketMQ版会将缓存的消息统一推送给消费者进行批量消费。
  • BatchConsumeMaxAwaitDurationInSeconds:批量消费的最大等待时长,等待时长达到参数设置的值,消息队列RocketMQ版会将缓存的消息统一推送给消费者进行批量消费。

应用场景

​ 若业务侧对消息吞吐量的要求优先于消息的实时性,建议使用批量消费功能。例如,给数据库中插入数据,每更新一条数据执行一次插入任务,如果数据更新较频繁,可能会对服务器造成较大压力。此时,您可以设置批量消费功能,例如,您可以设置为每10条数据批量插入一次或每5秒执行一次插入任务,降低系统运行压力。

使用限制

​ 消息队列RocketMQ版仅支持在消息获取方式为Push模式下配置批量消费功能。

示例代码

Push方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class BatchPushConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置每批次消费10条消息
consumer.setConsumeMessageBatchMaxSize(10);

//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
System.out.println("消费一批消息,消息长度:" + list.size());
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
Pull方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class BatchPullConsumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置每批次消费10条消息
consumer.setPullBatchSize(10);

//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
try {
while (true) {
List<MessageExt> list = consumer.poll();
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
System.out.println("消费一批消息,消息长度:" + list.size());
for (MessageExt ext : list) {
try {
System.out.println(new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
//提交偏移量
consumer.commitSync();
}
} finally {
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
consumer.shutdown();
}
}
}
测试

这里需要分为2种情况

  • Consumer端先启动

  • Consumer端后启动

    正常情况下:应该是Consumer需要先启动

Consumer先启动

由于这里是Consumer先启动,所以他会去轮询MQ上是否有订阅队列的消息,由于刚开始每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1),后面随着插入数据增加,就会每次消费10条。

image-20201216164334850

Producer先启动

由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer每次拉取10条 , 所以这段代码就生效了测试结果如下(每次size最多是10)

image-20201216164248246

评论