RocketMQ消费流程
消息消费总体流程
消息消费的总体流程如下图:
在Consumer端,参与消息消费管理的模块主要有以下4个:
PullMessageService模块
PullMessageService模块可以理解为消息拉取的驱动器,采用了任务队列和生产者-消费者模式将消息拉取请求和消息拉取执行过程异步解耦。其中包含了两个组件:
消息拉取任务队列
pullRequestQueue是一个阻塞队列。需要执行一次消息拉取任务时,只需要将一个消息拉取请求(PullRequest)对象放入队列中就可以了。具体的实现方法是:调用PullMessageService类的executePullRequestImmediately函数。
队列中的每个消息拉取请求对象包含一个本地消息缓存队列(ProcessQueue),用于缓存拉取到的消息。
消息拉取线程
消息拉取线程不断尝试从任务队列中取出消息拉取请求并执行。
Rebalance模块
Rebalance模块可以理解为消息拉取任务的启动者和任务数量的调节者。
Consumr刚启动时,PullMessageService中的消息拉取任务队列是空的。第一次执行负载均衡操作时,Rebalance模块计算得到消息拉取任务列表,填充到消息拉取任务队列,启动消息拉取循环。
系统运行过程中,由于扩容或缩容导致的负载均衡发生变化时,Rebalance模块会负责对任务队列中的任务数量进行增加或删除。
PushConsumer模块
PushConsumer模块可以理解为整个消息消费流程(包括消息拉取步骤和消息消费步骤)的协调者。
ConsumerMessageService模块
ConsumerMessageService模块可以理解为消息消费步骤的执行者。
在ConsumerMessageService中包含了一个线程池,用于进行多线程并行消费。线程池的核心线程数缺省值为20,可以通过DefaultMQPushConsumer类的setConsumeThreadMin函数修改,最大允许值为1000;线程池最大线程数缺省值为64,可以通过DefaultMQPushConsumer类的setConsumeThreadMax函数修改,最大允许值为1000。
消息消费过程每一步流程介绍如下:
初始化消息拉取
Step1~Step2:Rebalance模块初始化消息拉取任务列表
经过Rebalance运算,Consumer分配得到若干ConsumeQueue。Rebalance会为每个ConsumeQueue创建一个对应的PullRequest对象,添加到pullRequestQueue。
在实际代码中,Rebalance模块和PullMessageService没有直接关联,是通过调用PushConsumer的executePullRequestImmediately接口来初始化消息拉取任务列表的。为了强调因果关系,在Step2中表示为由Rebalance模块向消息拉取任务队列填充数据。
拉取消息
Step3:消息拉取线程委托PushConsumer进行一次消息拉取
当第一个PullRequest对象被添加到阻塞队列pullRequestQueue后,消息拉取线程从阻塞状态转变为运行状态,委托PushConsumer进行一次消息拉取。
长轮询拉取
Step4:长轮询是一个异步过程,如果有消息返回或者长轮询超时,PushConsumer的回调函数(PullCallback#onSuccess)会被调用。
长轮询结果返回
Step5:长轮询返回结果时,PushConsumer端的PullCallback#onSuccess回调函数会被调用。
一次消息拉取能够返回一批消息。拉取消息批量值由DefaultMQPushConsumer类的pullBatchSize成员变量控制。其含义是:一次拉取最多希望返回多少条信息。如果Broker端等待消费的消息比较少,实际拉取到的消息数量可能小于该值。拉取消息批量值缺省值为32,Consumer可以通过setPullBatchSize函数修改其值,最大允许值为1024。
消息消费
Step6:将消息暂时存放在本地内存,等待推送到消费者
消息暂存
Step6.1:将拉取得到的消息存放到本地消息缓存
在这里又一次使用了生产者-消费者模式将消息的拉取和消费过程进行异步解耦。
Consumer内的每一个ConsumeQueue被分配了一个对应的ProcessQueue对象作为本地消息缓存容器。
消费消息
Step6.2:PushConsumer委派ConsumeMessageService消费消息
通过调用ConsumeMessageService类的submitConsumeRequest方法进行消息的消费。
拉取消息
Step6.3:PushConsumer发起下一次长轮询
每次长轮询返回后,不论是否从Broker拉取到了消息,PushConsumer都会向PullMessageService的消息拉取任务队列里添加一个消息拉取请求,继续下一次消息拉取长轮询。
长轮询由Rebalance触发启动后,后续的长轮询循环由PushConsumer负责维护。只要负载均衡不发生变化,每完成一次长轮询,PushConsumer都会向消息拉取任务队列里添加下一次长轮询对应的消息拉取请求,如此不断循环进行。
回调消费消息
Step7.1~Step7.2:ConsumeMessageService调用上层应用的回调函数消费消息
一个拉取批量的消息会被切分成若干个消费批量进行消费。ConsumeMessageService会为每个消费批量创建一个ConsumeRequest对象,并将该对象指派给线程池内分配一个线程进行消息消费。例如:如果拉取批量值为32,消费批量值为4,一次拉取得到32个消息后,ConsumeMessageService会分配8个线程进行消息消费,每个线程消费4个消息。消费消息批量值由DefaultMQPushConsumer的consumeMessageBatchMaxSize成员变量控制,缺省值为1,Consumer可以通过setConsumeMessageBatchMaxSize函数修改其值,最大允许值为1024。
消息消费的主要工作是:调用上层应用的回调函数,并处理消费结果。如果消费结果异常,需要进行相关的异常处理。消息消费异常处理流程会在本文的后面的“消费异常处理”章节进行详细介绍。
消费进度管理
虽然集群消费模式的消费进度是集中保存在Broker的,为了提高效率,Consumer端也缓存了一份消费进度信息。两者之间通过一定的机制进行同步。
如上图所示,在Broker端,消费进度信息保存在ConsumerOffsetManager类的offsetTable表中,表中的信息会定时保存到consumerOffset.json文件中。该文件的缺省保存位置为:~\store\config。在Consumer端, RemoteBrokerOffsetStore类的offsetTable表用于保存消费进度的本地缓存。两边的offsetTable之间信息的同步方式不是实时同步,在Consumer或Broker异常退出时会发生消费进度滞后和重复消费的情况。根据我们在前面介绍的幂等消费约定,重复消费不会导致上层应用发生异常。
消费进度初始化流程
如上图所示,消费进度初始化流程为:
文件加载消费进度
Step1:Broker从consumerOffset.json文件加载消费进度
在Broker启动阶段,会从consumerOffset.json文件中加载消费进度信息到offsetTable表。
拉取消费进度信息
Step2~Step3:Consumer从Broker端拉取消费进度信息
Consumer启动后第一次进行Rebalance操作时,通过发送QUERY_CONSUMER_OFFSET消息从Broker端拉取消费进度信息作为消费进度的初始值。该值用于从Broker拉取消息时指定拉取起始位置。
客户端在订阅消息时,设置第一次进行消息消费的起始位置有三个选项:CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_LAST_OFFSET和CONSUME_FROM_TIMESTAMP。这三个选项只有在consumeOffset.json文件还没有生成,即Broker第一次启动时才有意义。一旦Broker已经生成了consumeOffset.json文件,Consumer第一次进行消息消费的起始位置是从这个文件中获取的。
初始化消费进度
Step4~Step5: Consumer使用消费进度初始值从Broker进行第一次消息拉取
消费进度更新流程
消费消息
Step1~Step2:Consumer从Broker拉取到一批消息,并进行消费
消费进度缓存
Step3:消息消费成功后,更新Consumer本地的消费进度缓存表offsetTable
消费进度同步流程
Consumer端的消费进度缓存信息通过两种方式同步到Broker端:长轮询请求“捎带”和定时同步。
通过长轮询请求“捎带”
Consumer每次发起长轮询请求从Broker拉取消息时,都会利用这次通信过程将本地缓存的消费进度“捎带”给Broker。
Consumer从Broker拉取消息
Step1~Step2进行Consumer向Broker发出长轮询拉取消息请求时,将请求头的SysFlag字段的FLAG_COMMIT_OFFSET标志位置1,commitOffset字段设置为本地缓存的消费进度值。
Broker更新消费进度
Step3进行Broker在处理长轮询返回消息后,更新消费进度信息表offsetTable。
定时同步
消费进度同步
Step1: Consumer定时发送消费进度同步信息给Broker
每隔5秒,Consumer会进行一次消费进度同步操作。同步操作由MQClientInstance发起,对Consumer分配得到的每个ConsumeQueue分别发送一个UPDATE_CONSUMER_OFFSET消息给Broker。
更新消费进度
Step2: Broker更新消费进度表
ConsumerManageProcessor接收到UPDATE_CONSUMER_OFFSET消息后,将消费进度更新到消费进度表offsetTable。
Broker持久化消费进度流程
每隔5秒,BrokerController的后台定时任务执行线程会调用ConsumerOffsetManager类的persist方法,将消费进度信息持久化到consumerOffset.json文件。
消费异常处理
对于并行消费模式,上层应用的消息消费回调函数原型为:
1 | ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt>msgs, final ConsumeConcurrentlyContextcontext); |
结果返回
一次回调函数的调用过程可以传入一组待消费消息, 该回调函数有两个渠道返回结果给调用者:
函数返回值,可以使用枚举值CONSUME_SUCCESS或RECONSUME_LATER表示传入的所有消息消费成功或需要重试。
函数的参数context
该参数的类型定义为:
1 | public class ConsumeConcurrentlyContext { |
其中,成员变量ackIndex的含义是:最后一个正常消费的消息索引号。索引号的最大值为传入的消息数量-1。通过context.setAckIndex(n),可以说明哪个索引号之后的消息消费失败。
另外,delayLevelWhenNextConsume成员变量用于指定消费失败后,回送消息到Broker的重试消息队列的等待延时级别。设置为-1:不重试直接回送到死信队列;设置为0:由Broker指定等待延时级别;设置为大于0的整数:由回调函数指定等待延时级别。
异常及处理方法
可能发生的异常及处理方法:
没有一个消息能够消费成功
回调函数返回RECONSUME_LATER。本次回调传入的所有消息会被发回到Broker的重试队列或死信队列。
部分消息消费成功
回调函数返回CONSUME_SUCCESS,并且在回调函数中调用context. setAckIndex(n),消费失败的消息会被发回到Broker的重试队列或死信队列。
回调函数执行超时或一直被阻塞
并行消费模式下,一个消息消费回调函数执行超时(超过15分钟)或一直阻塞,并不会马上阻塞住后续的消息消费,但是会带来一些副作用:
消费进度停止增长。
消费进度增长过程中,不会跳过没有完成消费的消息。
导致流控持续一段时间。
Consumer消息消费的流控规则中,有一条规则:如果消息本地缓存中的消息跨度大于2000,触发流控,等待50毫秒后重试拉取消息。
如果一次消息消费被阻塞,后续的消费虽然可以继续往下走,但是本地消息缓存中的消息跨度(缓存中的最后一个消息和第一个消息的queueOffset之间的差值)会不断增长,直到触发流控为止。
为了避免回调函数阻塞导致消息消费流控一直持续,RocketMQ采用了消息定期清理机制清理消费过程中发生阻塞的消息。
每隔15分钟,ConsumeMessageConcurrentlyService类的后台定时任务执行线程会发起一次消费被阻塞的消息清理操作,调用ProcessQueue的cleanExpiredMsg方法,按照顺序一次最多清理16个开始消费后超过15分钟还没有结束的消息。清理的同时,将这些消息发回给Broker的重试队列。