RocketMQ顺序消息
顺序消费
无序消息
无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费
比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。
局部顺序
在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。
就好比一个订单涉及 订单生成
,订单支付
、订单完成
。我不用管其它的订单,只保证同样订单ID能保证这个顺序
就可以了。
Rocket顺序消息
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。
之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)
实现原理
我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了16个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到16个queue,
好比有100条消息,那么这100条消息会平均分配在这16个Queue上,那么每个Queue大概放5~6个左右。这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则
这个时候思路就来了,好比有orderId=1的3条消息,分别是 订单生产、订单付款、订单完成。只要保证它们放到同一个Queue那就保证消费者先进先出了。
这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。
那么全局消息呢
这个就简单啦,你把所有消息都放在一个Queue里,这样不就保证全局消息了。
就这么简单
当然不是,这里还有很关键的一点,好比在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 订单生成,它拿完后,消费者2去queue拿到的是 订单支付。
拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到订单生成,但由于网络等原因,消费者2比你真正的先消费消息。这是不是很尴尬了。
订单付款还是可能会比订单生成更早消费的情况。那怎么办。
分布式锁来了
Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。
Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。
所以最终的消费者这边的逻辑就是
消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。
然后下一个消费者去拿到 订单支付 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。
消息类型对比
全局顺序与分区顺序对比
Topic消息类型 |
支持事务消息 |
支持定时/延时消息 |
性能 |
无序消息(普通、事务、定时/延时) |
是 |
是 |
最高 |
分区顺序消息 |
否 |
否 |
高 |
全局顺序消息 |
否 |
否 |
一般 |
发送方式对比
Topic消息类型 |
支持可靠同步发送 |
支持可靠异步发送 |
支持Oneway发送 |
无序消息(普通、事务、定时/延时) |
是 |
是 |
是 |
分区顺序消息 |
是 |
否 |
否 |
全局顺序消息 |
是 |
否 |
否 |
注意事项
- 顺序消息暂不支持广播模式。
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
- 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
- 对于全局顺序消息,建议创建实例个数 >=2。
代码示例
这里保证两点
- 生产端 同一orderID的订单放到同一个queue。
- 消费端 同一个queue取出消息的时候锁住整个queue,直到消费后再解锁。
相关代码
实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ProductOrder { private String orderId; private String orderName;
public ProductOrder(String orderId, String orderName) { this.orderId = orderId; this.orderName = orderName; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getOrderName() { return orderName; }
public void setOrderName(String orderName) { this.orderName = orderName; } }
|
Product(生产者)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class OrderProducer { private static final List<ProductOrder> orderList = new ArrayList<>();
static { orderList.add(new ProductOrder("XXX001", "订单创建")); orderList.add(new ProductOrder("XXX001", "订单付款")); orderList.add(new ProductOrder("XXX001", "订单完成")); orderList.add(new ProductOrder("XXX002", "订单创建")); orderList.add(new ProductOrder("XXX002", "订单付款")); orderList.add(new ProductOrder("XXX002", "订单完成")); orderList.add(new ProductOrder("XXX003", "订单创建")); orderList.add(new ProductOrder("XXX003", "订单付款")); orderList.add(new ProductOrder("XXX003", "订单完成")); }
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();
for (int i = 0; i < orderList.size(); i++) { ProductOrder order = orderList.get(i); Message message = new Message( "topicTest", order.getOrderId(), (order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET) );
SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object args) { String orderid = (String) args; int hashCode = orderid.hashCode(); hashCode = Math.abs(hashCode); long index = hashCode % mqs.size(); return mqs.get((int) index); } }, order.getOrderId());
System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType()); }
producer.shutdown(); } }
|
Consumer(消费者)
上面说过,消费者真正要达到消费顺序,需要分布式锁,所以这里需要将MessageListenerOrderly
替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class OrderConsumer { private static final Random random = new Random();
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) { if (list != null) { for (MessageExt ext : list) { try { try { TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); }
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET); int queueId = context.getMessageQueue().getQueueId(); System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],接收queueId:[" + queueId + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
} catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } return ConsumeOrderlyStatus.SUCCESS; } });
consumer.start(); System.out.println("消息消费者已启动"); } }
|
测试
生产者发送消息
看看生产者有没有把相同订单指定到同一个queue
1 2 3 4 5 6 7 8 9
| product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单创建 product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单付款 product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单完成 product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单创建 product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单付款 product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单完成 product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单创建 product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单付款 product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单完成
|
通过测试结果可以看出:相同订单已经存到同一queue中了
。
消费者消费消息
单消费者
看看消费结果是不是我们需要的结果
1 2 3 4 5 6 7 8 9 10
| 消息消费者已启动 Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114160498],消息=[ProductOrder{orderId='XXX002', type='订单创建'}] Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114165490],消息=[ProductOrder{orderId='XXX001', type='订单创建'}] Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114167523],消息=[ProductOrder{orderId='XXX003', type='订单创建'}] Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114169502],消息=[ProductOrder{orderId='XXX002', type='订单付款'}] Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114172492],消息=[ProductOrder{orderId='XXX001', type='订单付款'}] Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114173503],消息=[ProductOrder{orderId='XXX002', type='订单完成'}] Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114173524],消息=[ProductOrder{orderId='XXX003', type='订单付款'}] Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114179493],消息=[ProductOrder{orderId='XXX001', type='订单完成'}] Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114182524],消息=[ProductOrder{orderId='XXX003', type='订单完成'}]
|
MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。
消费异常
如果出现消费异常返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
后面的消息将无法消费。
多消费者
消费者A
1 2 3
| Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115320966],消息=[ProductOrder{orderId='XXX002', type='订单创建'}] Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115323966],消息=[ProductOrder{orderId='XXX002', type='订单付款'}] Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115323966],消息=[ProductOrder{orderId='XXX002', type='订单完成'}]
|
消费者B
1 2 3 4
| Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115319873],消息=[ProductOrder{orderId='XXX001', type='订单创建'}] Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115326873],消息=[ProductOrder{orderId='XXX001', type='订单付款'}] Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115333873],消息=[ProductOrder{orderId='XXX001', type='订单完成'}]
|
消费者C
1 2 3 4
| Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115326417],消息=[ProductOrder{orderId='XXX003', type='订单创建'}] Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115333418],消息=[ProductOrder{orderId='XXX003', type='订单付款'}] Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115341418],消息=[ProductOrder{orderId='XXX003', type='订单完成'}]
|
小结
通过测试结果我们看出
- 消费消息的顺序并没有完全按照之前的先进先出,即没有满足全局顺序。
- 同一订单来讲,订单的 订单生成、订单支付、订单完成 消费顺序是保证的。
这是局部保证顺序消费就已经满足我们当前实际开发中的需求了。
rocketmq的顺序消息需要满足2点:
- Producer端保证发送消息有序,且发送到同一个队列。
- consumer端保证消费同一个队列。