RocketMQ消息发送
消息的发送
普通消息是指消息队列 RocketMQ 中无特性的消息,区别于有特性的定时/延时消息、顺序消息和事务消息。
RocketMQ支持3种消息发送方式:
- 同步(sync)
- 异步(async)
- 单向(oneway)
单向发送
单向发送,见名知意,就是一种单方向通信方式,也就是说 producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
使用场景
由于单向发送只是简单地发送消息,不需要等待响应,也没有回调接口触发,故发送消息所耗费的时间非常短,同时也意味着消息不可靠。所以这种单向发送比较适用于那些耗时要求非常短,但对可靠性要求并不高的场景,比如说日志收集。
代码演示
调用
DefaultMQProducer
的sendOneway
方法
1 | public class OnewayProducer { |
同步发送
发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果,会在收到接收方发回响应之后才发下一个数据包的通讯方式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。
简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。
使用场景
由于这种同步发送的方式确保了消息的可靠性,同时也能及时得到消息发送的结果,故而适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等。在实际应用中,这种同步发送的方式还是用得比较多的。
注意事项
这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed
)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。
代码演示
调用
DefaultMQProducer
的send
方法
1 | public class SyncProducer { |
异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(
SendCallback
)
异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应。所以要记住一点,异步发送同样可以对消息的响应结果进行处理。
使用场景
由于异步发送不需要等待 broker 的响应,故在一些比较注重 RT(响应时间)的场景就会比较适用。比如,在一些视频上传的场景,我们知道视频上传之后需要进行转码,如果使用同步发送的方式来通知启动转码服务,那么就需要等待转码完成才能发回转码结果的响应,由于转码时间往往较长,很容易造成响应超时。此时,如果使用的是异步发送通知转码服务,那么就可以等转码完成后,再通过回调接口来接收转码结果的响应了。
注意事项
注意:RocketMQ内部只对同步模式做了重试,异步发送模式是没有自动重试的,需要自己手动实现
1 | int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; |
timesTotal 是总的重试次数
代码演示
调用
DefaultMQProducer
的send
方法
1 | public class AsyncProducer { |
发送消息小结
消息发送的权衡
下面通过一张表格,简单总结一下同步发送、异步发送和单向发送的特点。
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 | 适用场景 |
---|---|---|---|---|
同步发送 | 一般 | 有 | 不丢失 | 重要的通知场景 |
异步发送 | 快 | 有 | 不丢失 | 比较注重 RT(响应时间)的场景 |
单向发送 | 最快 | 无 | 可能丢失 | 可靠性要求并不高的场景 |
可以看到,从发送 TPS 来看,由于单向发送不需要等待响应也没有回调接口触发,发送速度非常快,一般都是微秒级的,在消息体大小一样的情况下,其发送 TPS 最大。而同步发送,需要等待响应结果的返回,受网络状况的影响较大,故发送 TPS 就比较小。异步发送不等待响应结果,发送消息时几乎不受网络的影响,故相比同步发送来说,其发送 TPS 要大得多。
关于可靠性,大家需要牢记前面提过的,异步发送并不意味着消息不可靠,异步发送也是会接收到响应结果,也能对响应结果进行处理。即使发送失败,也可以通过一些补偿手段进行消息重发。和同步发送比起来,异步发送的发送 TPS 更大,更适合那些调用链路较长的一些场景。在实际使用中,同步发送和异步发送都是较为常用的两种方式,大家要视具体业务场景进行合理地选择。
使用场景
在实际使用场景中,利用何种发送方式,可以总结如下:
- 当发送的消息不重要时,采用
one-way
方式,以提高吞吐量; - 当发送的消息很重要是,且对响应时间不敏感的时候采用
sync
方式; - 当发送的消息很重要,且对响应时间非常敏感的时候采用
async
方式;
各种发送方法整理
批量消息发送
以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当然是可以的,RocketMQ为我们提供了这样的功能。
使用条件
但是它也有一些使用的条件:
- 同一批发送的消息的Topic必须相同;
- 同一批消息的waitStoreMsgOK 必须相同;
- 批量发送的消息不支持延迟;
- 同一批次的消息,大小不能超过1MiB;
代码演示
好了,只要我们满足上面的这些限制,就可以使用批量发送了,我们来看看发送端的代码吧,
1 | public class BatchProducer { |
其实批量发送很简单,我们只是把消息放到一个List当中,然后统一的调用send方法发送就可以了,消费端的代码没有任何的变化,正常的接收消息就可以了
MQ的重试机制
由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。
RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。
生产端重试
如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:
1 |
|
示例代码
1 | public class RetryProducer { |
超时重试纠正
超时重试`针对网上说的超时异常会重试的说法都是错误的,百度查的所以文章都说超时异常都会重试,经过产看源码才发现这个问题。
发现这个问题,是因为我上面超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但我设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报显然不应该是一个级别。但测试发现无论我设置的多少次的重试次数,报异常的时间都差不多。
1 | org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout |
重试解惑
针对这个疑惑,我去看了源码之后,才恍然大悟。
1 | /** |
重试总结
通过这段源码很明显可以看出以下几点
- 如果是异步发送那么重试次数只有1次
- 对于同步而言,超时异常也是不会再去重试。
- 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。
RocketMQ使用注意
Broker 故障规避
如果开启的话,发送异常后会根据配置计算一个不可用时间,然后选择broker的时候会判断是否在规避时间内,在的话就不跳过这个broker
禁止自动创建Topic
自动创建topic流程
autoCreateTopicEnable
设置为true 标识开启自动创建topic流程如下:
- 消息发送时如果根据topic没有获取到 路由信息,则会根据默认的topic去获取,获取到路由信息后选择一个队列进行发送,发送时报文会带上默认的topic以及默认的队列数量。
- 消息到达broker后,broker检测没有topic的路由信息,则查找默认topic的路由信息,查到表示开启了自动创建topic,则会根据消息内容中的默认的队列数量在本broker上创建topic,然后进行消息存储。
- broker创建topic后并不会马上同步给namesrv,而是每30进行汇报一次,更新namesrv上的topic路由信息,producer会每30s进行拉取一次topic的路由信息,更新完成后就可以正常发送消息。更新之前一直都是按照默认的topic查找路由信息。
禁用自动创建topic
为什么生产不能开启自动创建topic
上述 broker 中流程会有一个问题,就是在producer更新路由信息之前的这段时间,如果消息只发送到了broker-a,则broker-b上不会创建这个topic的路由信息,broker互相之间不通信。当producer更新之后,获取到的broker列表只有broker-a,就永远不会轮询到broker-b的队列(因为没有路由信息),所以我们生产通常关闭自动创建broker,而是采用手动创建的方式。
消息发布流程
生产者启动流程
DefalutMQProducer 是默认的消息生产者实现类,它实现 MQAdmin 接口。
- 检查 productGroup 是否符合要求
- 创建 MQClientInstance 实例,整个 JVM 实例中只存在一个 MQClientManager 实例,维护一个 MQClientInstance 缓存表,也就是同一个 clientId 只会创建一个 MQClientInstance。
- 启动 MQClientInstance 实例。
消息发送基本流程
消息发送流程主要的步骤:
消息长度验证
默认消息发送以同步方式发送,默认超时时间为 3s。
查找主题路由信息
消息发送之前,需要知道主题的路由信息,以此确定发送具体的 Broker 节点。
选择消息队列
根据路由信息选择消息队列(MessageQueue)。
消息发送
消息发送的 API 核心入口:
DefaultMQProducerImpl#sendKernelImpl
- 根据 MessageQueue 获取 Broker 的网络地址。
- 为消息分配全局唯一 ID,如果消息体默认超过 4K,会对消息体采用 zip 压缩。
- 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
- 构建消息发送请求包。
- 根据消息发送方式,同步、异步、单向方式进行网络传输。
- 如果注册了消息发送钩子函数,执行 after 逻辑。