抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ事务消息

分布式事务

微服务倡导将复杂的系统拆分为若干个简单、职责单一、松耦合的服务,可以降低开发难度,便于敏捷开发。而对大多数中小型公司来说,实施微服务架构面临以下困难:

  • 单体应用拆分为分布式系统后,应用间的通讯和故障处理机制变得复杂
  • 微服务化后,一个简单的功能需要调用多个服务并操作多个数据库实现,数据一致性难以保障
  • 大量的微服务,导致其测试、维护、部署变得困难

为了保障微服务架构下数据的一致性,通常需要引入分布式事务来解决,当前比较流行的分布式解决方案如下。

基于二阶段提交的XA协议

  • 第一阶段:协调者询问所有参与者是否可以执行提交操作,参与者执行准备工作,例如为资源上锁,预留资源,写undo/redo log。

  • 第二阶段:若所有参与者回应“可提交”,则向所有参与者发送正式提交命令;若某个参与者回应“拒绝提交”,则向所有参与者发送回滚命令。

XA协议保障了事务的强一致性,然而由于其采用的阻塞协议带来的巨大性能开销,难以达到较高的系统吞吐量。

TCC模式

TCC提供了一种全局事务解决方案,业务系统只需实现下面三个操作,即可完成分布式事务:

  • TRY:完成参与者业务检查并预留业务资源
  • CONFIRM:使用TRY阶段的预留业务资源,并执行业务
  • CANCEL:释放TRY结算预留的业务资源

​ TCC模式可以让业务更灵活地定义数据库操作的粒度,使得降低锁冲突、提高吞吐量成为可能,然而它对业务的侵入度较高,实现难度较大。

事务消息

消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。发送prepare消息,该消息对Consumer不可见

事务消息发送及提交

  1. 发送消息(half消息);
  2. 服务端响应消息写入结果;
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。

事务消息的补偿流程

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新Commit或RollBack 其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。

事务消息状态

事务消息共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。

消息类型对比

Topic的消息类型 是否支持事务消息 是否支持定时/延时消息 性能
无序消息(普通、事务、定时/延时消息) 最高
分区顺序消息
全局顺序消息 一般

发送方式对比

消息类型 是否支持同步发送 是否支持异步发送 是否支持单向发送
无序消息(普通、事务、定时/延时消息) 最高
分区顺序消息
全局顺序消息

RocketMq事务消息

实现思想

​ RocketMQ 事务消息,是指发送消息事件和其他事件需要同时成功或同失败。比如银行转账, A 银行的某账户要转一万元到 B 银行的某账户。A 银 行发送“B 银行账户增加一万元” 这个消息,要和“从 A 银行账户扣除一万元”这个操作同时成功或者同时失败。

​ RocketMQ 采用两阶段提交的方式实现事务消息,TransactionMQProducer 处理上面情况的流程是,先发一个“准备从 B 银行账户增加一万元”的消息,发送成功后做从 A 银行账户扣除一万元的操作 ,根据操作结果是否成功,确定之前的“准备从 B 银行账户增加一万元”的消息是做 commit 还是 rollback ,

RocketMQ 实现的具体流程如下:

  1. 发送方向 RocketMQ 发送“待确认”(Prepare)消息。

  2. RocketMQ 将收到的“待确认”(一般写入一个 HalfTopic 主题)消息持化成功后, 向发送方回复消息已经发送成功,此时第一阶段消息发送完成,发送方开始执行本地事件逻辑.

  3. 发送方根据事件执行结果向 RocketMQ 发送二次确认( Commit 还是 Rollback)消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些 消息才会进入生产时发送实际的主题 RealTopic),订阅方将能够收到该消息;收到 Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息。

  4. 如果出现异常情况,步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求.

  5. 发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ), 通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。

使用约束

  1. 消息事务不支持定时和批量。
  2. 为了避免一个消息被多次检查,导致半数队列消息堆积,我们限制单个消息的默认检查次数为15次,但用户可以改变这个限制通过修改broker的配置文件中的 transactionCheckMax参数。如果一个消息检查次数超过transactionCheckMax,默认情况下,broker将会丢弃这个消息并同时打印错误日志。用户可以改变这种行为通过覆盖 AbstractTransactionCheckListener 类。
  3. 由broker的配置文件中参数 transactionTimeou t决定的特点时间段之后检查事务消息。当发送事务消息时,通过设置用户配置CHECK_IMMUNITY_TIME_IN_SECONDS,用户也可以改变这个限制。这个参数优先于 transactionMsgTimeout 参数。
  4. 一个事务消息可能被检查或消费多次。
  5. 提交过的消息重新放到用户目标主题可能会失败。目前,它依赖日志记录。通过RocketMQ自身高可用机制确保高可用。如果你想确保事务消息不丢失并且保证事务完整性,建议使用同步双写机制。
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。不像其他类型消息,事务消息允许回查。MQ server通过生产者ID查询客户端。

事务流程

客户端事务消息发送

​ 发送prepare消息复用了普通消息发送,只是给消息增加了TRAN_MSG=true的属性,该属性决定prepare消息对Consumer不可见

消息写入CommitLog

​ 事务消息的CommitLog写入和普通消息一致,它利用文件的顺序写来提升吞吐量,并采用文件映射的方式降低系统开销。

消息写入ConsumeQueue

​ ConsumeQueue的写入是采用异步方式完成的,ReputMessageSerivce作为一个长驻线程负责查询索引的构造和ConsumeQueue的写入,对于Prepare/Rollback消息不会写ConsumeQueue,从而保证它们对Consumer不可见

Broker端事务提交/回滚

​ Broker收到提交/回滚指令后,首先从根据offset从CommitLog读出原有的prepare消息,构造新的消息(修改事务状态标识)并写入Broker。对于一条事务消息,RocketMq会存储两条消息,存在一定资源浪费。其实它是为了保证随后的消费者能尽可能从PageCache中读到该消息,而不是读取较早的prepare消息(可能导致缺页中断),以提升系统吞吐量。

​ 此外,rocketmq的最新版本(4.2.0)尚未支持本地事务的状态回查,这样可能存在由于网络抖动,导致commit/rollback未提交到broker导致prepare消息长期悬挂的风险。

​ 在RocketMq的设计文档中,为事务消息增加了一张事务状态表,它维护了消息的Offset、事务状态(P/C/R)信息。可以采用如下思路实现事务消息的回查机制:

  • 在prepare消息写入commitLog后,可以通过CommitLogDispatcher写入一条事务状态记录(state=P)

  • 在提交/回滚事务时,根据transactionId找到对应的事务状态记录,并修改对应的事务状态

  • 通过长驻线程扫描事务状态表,对于超过一定时间的Prepare事务,发起对业务方的事务状态回查,根据回查结果修改事务状态,并向brokder发送相应的Commit/Rollback消息。

任务回查

​ RocketMQ 通过 TransactionalMessageCheckService 线程定时去检测 RMQ_SYS_ TRANS_ HALF_TOPIC 主题中的消息,回查消息的事务状态 TransactionalMessageCheckService 的检测频率默认为 1 分钟,可通过在 broker.conf 文件中设置 transactionChecklnterval 来改变默认值,单位为毫秒。

评论