抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ消费流程

消息消费总体流程

消息消费的总体流程如下图:

img

在Consumer端,参与消息消费管理的模块主要有以下4个:

PullMessageService模块

​ PullMessageService模块可以理解为消息拉取的驱动器,采用了任务队列和生产者-消费者模式将消息拉取请求和消息拉取执行过程异步解耦。其中包含了两个组件:

消息拉取任务队列

​ pullRequestQueue是一个阻塞队列。需要执行一次消息拉取任务时,只需要将一个消息拉取请求(PullRequest)对象放入队列中就可以了。具体的实现方法是:调用PullMessageService类的executePullRequestImmediately函数。

​ 队列中的每个消息拉取请求对象包含一个本地消息缓存队列(ProcessQueue),用于缓存拉取到的消息。

消息拉取线程

​ 消息拉取线程不断尝试从任务队列中取出消息拉取请求并执行。

Rebalance模块

Rebalance模块可以理解为消息拉取任务的启动者和任务数量的调节者。

​ Consumr刚启动时,PullMessageService中的消息拉取任务队列是空的。第一次执行负载均衡操作时,Rebalance模块计算得到消息拉取任务列表,填充到消息拉取任务队列,启动消息拉取循环。

​ 系统运行过程中,由于扩容或缩容导致的负载均衡发生变化时,Rebalance模块会负责对任务队列中的任务数量进行增加或删除。

PushConsumer模块

​ PushConsumer模块可以理解为整个消息消费流程(包括消息拉取步骤和消息消费步骤)的协调者。

ConsumerMessageService模块

ConsumerMessageService模块可以理解为消息消费步骤的执行者。

​ 在ConsumerMessageService中包含了一个线程池,用于进行多线程并行消费。线程池的核心线程数缺省值为20,可以通过DefaultMQPushConsumer类的setConsumeThreadMin函数修改,最大允许值为1000;线程池最大线程数缺省值为64,可以通过DefaultMQPushConsumer类的setConsumeThreadMax函数修改,最大允许值为1000。

消息消费过程每一步流程介绍如下:

初始化消息拉取

Step1~Step2:Rebalance模块初始化消息拉取任务列表

​ 经过Rebalance运算,Consumer分配得到若干ConsumeQueue。Rebalance会为每个ConsumeQueue创建一个对应的PullRequest对象,添加到pullRequestQueue。

​ 在实际代码中,Rebalance模块和PullMessageService没有直接关联,是通过调用PushConsumer的executePullRequestImmediately接口来初始化消息拉取任务列表的。为了强调因果关系,在Step2中表示为由Rebalance模块向消息拉取任务队列填充数据。

拉取消息

Step3:消息拉取线程委托PushConsumer进行一次消息拉取

​ 当第一个PullRequest对象被添加到阻塞队列pullRequestQueue后,消息拉取线程从阻塞状态转变为运行状态,委托PushConsumer进行一次消息拉取。

长轮询拉取

Step4:长轮询是一个异步过程,如果有消息返回或者长轮询超时,PushConsumer的回调函数(PullCallback#onSuccess)会被调用。

长轮询结果返回

Step5:长轮询返回结果时,PushConsumer端的PullCallback#onSuccess回调函数会被调用。

​ 一次消息拉取能够返回一批消息。拉取消息批量值由DefaultMQPushConsumer类的pullBatchSize成员变量控制。其含义是:一次拉取最多希望返回多少条信息。如果Broker端等待消费的消息比较少,实际拉取到的消息数量可能小于该值。拉取消息批量值缺省值为32,Consumer可以通过setPullBatchSize函数修改其值,最大允许值为1024。

消息消费

Step6:将消息暂时存放在本地内存,等待推送到消费者

消息暂存

Step6.1:将拉取得到的消息存放到本地消息缓存

​ 在这里又一次使用了生产者-消费者模式将消息的拉取和消费过程进行异步解耦。

​ Consumer内的每一个ConsumeQueue被分配了一个对应的ProcessQueue对象作为本地消息缓存容器。

消费消息

Step6.2:PushConsumer委派ConsumeMessageService消费消息

​ 通过调用ConsumeMessageService类的submitConsumeRequest方法进行消息的消费。

拉取消息

Step6.3:PushConsumer发起下一次长轮询

​ 每次长轮询返回后,不论是否从Broker拉取到了消息,PushConsumer都会向PullMessageService的消息拉取任务队列里添加一个消息拉取请求,继续下一次消息拉取长轮询。

​ 长轮询由Rebalance触发启动后,后续的长轮询循环由PushConsumer负责维护。只要负载均衡不发生变化,每完成一次长轮询,PushConsumer都会向消息拉取任务队列里添加下一次长轮询对应的消息拉取请求,如此不断循环进行。

回调消费消息

Step7.1~Step7.2:ConsumeMessageService调用上层应用的回调函数消费消息

​ 一个拉取批量的消息会被切分成若干个消费批量进行消费。ConsumeMessageService会为每个消费批量创建一个ConsumeRequest对象,并将该对象指派给线程池内分配一个线程进行消息消费。例如:如果拉取批量值为32,消费批量值为4,一次拉取得到32个消息后,ConsumeMessageService会分配8个线程进行消息消费,每个线程消费4个消息。消费消息批量值由DefaultMQPushConsumer的consumeMessageBatchMaxSize成员变量控制,缺省值为1,Consumer可以通过setConsumeMessageBatchMaxSize函数修改其值,最大允许值为1024。

​ 消息消费的主要工作是:调用上层应用的回调函数,并处理消费结果。如果消费结果异常,需要进行相关的异常处理。消息消费异常处理流程会在本文的后面的“消费异常处理”章节进行详细介绍。

消费进度管理

虽然集群消费模式的消费进度是集中保存在Broker的,为了提高效率,Consumer端也缓存了一份消费进度信息。两者之间通过一定的机制进行同步。

img

​ 如上图所示,在Broker端,消费进度信息保存在ConsumerOffsetManager类的offsetTable表中,表中的信息会定时保存到consumerOffset.json文件中。该文件的缺省保存位置为:~\store\config。在Consumer端, RemoteBrokerOffsetStore类的offsetTable表用于保存消费进度的本地缓存。两边的offsetTable之间信息的同步方式不是实时同步,在Consumer或Broker异常退出时会发生消费进度滞后和重复消费的情况。根据我们在前面介绍的幂等消费约定,重复消费不会导致上层应用发生异常。

消费进度初始化流程

img

如上图所示,消费进度初始化流程为:

文件加载消费进度

Step1:Broker从consumerOffset.json文件加载消费进度

​ 在Broker启动阶段,会从consumerOffset.json文件中加载消费进度信息到offsetTable表。

拉取消费进度信息

Step2~Step3:Consumer从Broker端拉取消费进度信息

​ Consumer启动后第一次进行Rebalance操作时,通过发送QUERY_CONSUMER_OFFSET消息从Broker端拉取消费进度信息作为消费进度的初始值。该值用于从Broker拉取消息时指定拉取起始位置。

​ 客户端在订阅消息时,设置第一次进行消息消费的起始位置有三个选项:CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_LAST_OFFSET和CONSUME_FROM_TIMESTAMP。这三个选项只有在consumeOffset.json文件还没有生成,即Broker第一次启动时才有意义。一旦Broker已经生成了consumeOffset.json文件,Consumer第一次进行消息消费的起始位置是从这个文件中获取的。

初始化消费进度

Step4~Step5: Consumer使用消费进度初始值从Broker进行第一次消息拉取

消费进度更新流程

img

消费消息

Step1~Step2:Consumer从Broker拉取到一批消息,并进行消费

消费进度缓存

Step3:消息消费成功后,更新Consumer本地的消费进度缓存表offsetTable

消费进度同步流程

Consumer端的消费进度缓存信息通过两种方式同步到Broker端:长轮询请求“捎带”和定时同步。

通过长轮询请求“捎带”

img

Consumer每次发起长轮询请求从Broker拉取消息时,都会利用这次通信过程将本地缓存的消费进度“捎带”给Broker。

Consumer从Broker拉取消息

Step1~Step2进行Consumer向Broker发出长轮询拉取消息请求时,将请求头的SysFlag字段的FLAG_COMMIT_OFFSET标志位置1,commitOffset字段设置为本地缓存的消费进度值。

Broker更新消费进度

Step3进行Broker在处理长轮询返回消息后,更新消费进度信息表offsetTable。

定时同步

img

消费进度同步

Step1: Consumer定时发送消费进度同步信息给Broker

​ 每隔5秒,Consumer会进行一次消费进度同步操作。同步操作由MQClientInstance发起,对Consumer分配得到的每个ConsumeQueue分别发送一个UPDATE_CONSUMER_OFFSET消息给Broker。

更新消费进度

Step2: Broker更新消费进度表

​ ConsumerManageProcessor接收到UPDATE_CONSUMER_OFFSET消息后,将消费进度更新到消费进度表offsetTable。

Broker持久化消费进度流程

每隔5秒,BrokerController的后台定时任务执行线程会调用ConsumerOffsetManager类的persist方法,将消费进度信息持久化到consumerOffset.json文件。

img

消费异常处理

对于并行消费模式,上层应用的消息消费回调函数原型为:

1
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt>msgs, final ConsumeConcurrentlyContextcontext);

结果返回

一次回调函数的调用过程可以传入一组待消费消息, 该回调函数有两个渠道返回结果给调用者:

​ 函数返回值,可以使用枚举值CONSUME_SUCCESS或RECONSUME_LATER表示传入的所有消息消费成功或需要重试。

函数的参数context

该参数的类型定义为:

1
2
3
4
5
6
7
8
9
10
11
12
public class ConsumeConcurrentlyContext {
private final MessageQueue messageQueue;
/**
* Messageconsume retry strategy<br>
* -1,no retry,put into DLQdirectly<br>
* 0,brokercontrol retry frequency<br>
* >0,clientcontrol retry frequency
*/
private int delayLevelWhenNextConsume = 0;
private int ackIndex = Integer.MAX_VALUE;

}

​ 其中,成员变量ackIndex的含义是:最后一个正常消费的消息索引号。索引号的最大值为传入的消息数量-1。通过context.setAckIndex(n),可以说明哪个索引号之后的消息消费失败。

​ 另外,delayLevelWhenNextConsume成员变量用于指定消费失败后,回送消息到Broker的重试消息队列的等待延时级别。设置为-1:不重试直接回送到死信队列;设置为0:由Broker指定等待延时级别;设置为大于0的整数:由回调函数指定等待延时级别。

异常及处理方法

可能发生的异常及处理方法:

没有一个消息能够消费成功

​ 回调函数返回RECONSUME_LATER。本次回调传入的所有消息会被发回到Broker的重试队列或死信队列。

部分消息消费成功

​ 回调函数返回CONSUME_SUCCESS,并且在回调函数中调用context. setAckIndex(n),消费失败的消息会被发回到Broker的重试队列或死信队列。

回调函数执行超时或一直被阻塞

​ 并行消费模式下,一个消息消费回调函数执行超时(超过15分钟)或一直阻塞,并不会马上阻塞住后续的消息消费,但是会带来一些副作用:

消费进度停止增长。

​ 消费进度增长过程中,不会跳过没有完成消费的消息。

导致流控持续一段时间。

​ Consumer消息消费的流控规则中,有一条规则:如果消息本地缓存中的消息跨度大于2000,触发流控,等待50毫秒后重试拉取消息。

​ 如果一次消息消费被阻塞,后续的消费虽然可以继续往下走,但是本地消息缓存中的消息跨度(缓存中的最后一个消息和第一个消息的queueOffset之间的差值)会不断增长,直到触发流控为止。

​ 为了避免回调函数阻塞导致消息消费流控一直持续,RocketMQ采用了消息定期清理机制清理消费过程中发生阻塞的消息。

​ 每隔15分钟,ConsumeMessageConcurrentlyService类的后台定时任务执行线程会发起一次消费被阻塞的消息清理操作,调用ProcessQueue的cleanExpiredMsg方法,按照顺序一次最多清理16个开始消费后超过15分钟还没有结束的消息。清理的同时,将这些消息发回给Broker的重试队列。

评论