RocketMQ深入理解
消息生产者流程
生产者的流程主要讲述 DefaultMQProducer 类的具体实现。
消息发送的主要流程: 验证消息、 查找路由、 消息发送(包含异常机制)
验证消息
主要是要求主题名称、消息体不能为空、消息长度不能等于 0,且不能超过消息的最大的长度 4M(生产者对象中配置maxMessageSize=1024*1024*4)
查找路由
客户端(生产者)会缓存 topic 路由信息(如果是第一次发送消息,本地没有缓存,查询 NameServer 尝试获取),路由信息主要包含了消息队列(queue 相关信息)。
消息发送
选择消息队列,发送消息,发送成功则返回。选择消息队列两种方式(一般有两种,这里不做详细讲解,后续做详细讲解)。
批量消息发送
注意单批次不能超过消息的最大的长度 4M(生产者对象中配置 maxMessageSize=1024*1024*4)
- 批量消息要求必要具有统一Topic、相同消息配置
- 不支持延时消息
- 建议一个批量消息最好不要超过1MB大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducer"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String topic = "TopicTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); System.out.printf("Batch over"); producer.shutdown(); } }
|
消息重试机制
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class AsyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("async"); producer.setNamesrvAddr("192.168.0.128:9876"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(2); int messageCount = 10; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("TopicTest","TagC","OrderID"+index, ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, new String(msg.getBody())); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(100, TimeUnit.SECONDS); producer.shutdown(); } }
|
通过代码和源码,一般发送并忘记没有重试。注意重试的原则,一般会采用规避原则(规避原则就是上一次消息发送过程中发现错误,在某一段时间内,消息生产者不会选择该Broker上的消息队列,这样可以提高发送消息的成功率)。
注意重试的原则,一般会采用规避原则(规避原则就是上一次消息发送过程中发现错误,在某一段时间内,消息生产者不会选择该 Broker 上的消息队列,这样可以提高发送消息的成功率)
规避原则
注意了,这里我们发现,有可能在实际的生产过程中,我们的 RocketMQ 有几台服务器构成的集群
其中有可能是一个主题 TopicA 中的 4 个队列分散在 Broker1、Broker2、Broker3 服务器上。
如果这个时候 Broker2 挂了,我们知道,但是生产者不知道(因为生产者客户端每隔 30S 更新一次路由,但是 NamServer 与 Broker 之间的心跳检测间隔是 10S,所以生产者最快也需要 30S 才能感知 Broker2 挂了),所以发送到 queue2 的消息会失败,RocketMQ 发现这次消息发送失败后,就会将 Broker2排除在消息的选择范围,下次再次发送消息时就不会发送到 Broker2,这样做的目的就是为了提高发送消息的成功率。
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重 试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
消息模式
RocketMQ提供两个模式进行消费
拉模式(PULL)
代码上使用DefaultMQPullConsumer
获取MessageQueues并遍历(一个Topic包括多个MessageQueue),如果是特殊情况,也可以选择指定的MessageQueue来读取消息。
维护Offsetstore,从一个MessageQueue里拉取消息时,要传入Offset参数,随着不断的读取消息,Offset会不断增长。这个时候就需要用户把Offset存储起来,根据实际的情况存入内存、写入磁盘或者数据库中。
根据不同的消息状态做不同的处理。
拉取消息的请求后,会返回:FOUND(获取到消息),NO_MATCHED_MSG(没有匹配的消息),NO_NEW_MSG(没有新消息),OFFSET_ILLEGAL(非法偏移量)四种状态,其中必要重要的是FOUND(获取到消息)和NO_NEW_MSG(没有新消息)。
总结:这种模式下用户需要自己处理Queue,并且自己保存偏移量,所以这种方式太过灵活,往往我们业务的关注重点不在内部消息的处理上,所以一般情况下我们会使用推模式。
代码示例:消费者-拉模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullconsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); System.out.println("ms:"+consumer.getBrokerSuspendMaxTimeMillis()); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { System.out.println("queueID:"+ mq.getQueueId()); long Offset = consumer.fetchConsumeOffset(mq,true); System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq,null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n",pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: for (int i=0;i<pullResult.getMsgFoundList().size();i++) { System.out.printf("%s%n", new String(pullResult.getMsgFoundList().get(i).getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
|
推模式(PUSH)
代码上使用DefaultMQPushConsumer
Push方式是Server端接收到消息后,主动把消息推给Client端,实时性高,但是使用Push方式主动推送也存在一些问题:比如加大Server端的工作量,其次Client端的处理能力各不相同,如果Client不能及时处理Server推过来的消息,会造成各种潜在的问题。
长轮询
所以RocketMQ使用“长轮询”的方式来解决以上问题,核心思想是这样,客户端还是拉取消息,Broker端HOLD住客户端发过来的请求一小段时间,在这个时间内(5s)有新消息达到,就利用现有的连接立刻返回消息给Consunmer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。因为长轮询方式的有局限性,是在HOLD住Consumer请求的时候需要占用资源,所以它适合在消息队列这种客户端连接数可控的场景中。
流量控制
Push模式基于拉取,消费者会判断获取但还未处理的消息个数、消息总大小、Offset的跨度3个维度来控制,如果任一值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。
两种情况会限流,限流的做法是放弃本次拉取消息的动作,并且这个队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列。
Push模式基于拉取,消费者会判断获取但还未处理的消息个数、消息总大小、Offset的跨度3个维度来控制,如果任一值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。
两种情况会限流,限流的做法是放弃本次拉取消息的动作,并且这个队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列。
消息队列负载与重新分布机制
在集群消费模式中,往往会有很多个消费者,对应消费一个主题(topic),一个主题中有很多个消费者队列(queue),我们要考虑的问题是,集群内多个消费者是如何负载主题下的多个消费者队列,并且如果有新的消费者加入,消息队列又会如何重新分布。
从源码的角度上看,RocketMQ消息队列重新分布是由RebalanceService线程来实现的,一个MQClientInstance持有一个RebalanceService实现,并且随着MQClientInstance的启动而启动。
备注:MQClientInstance是生产者和消费者中最大的一个实例,作为生产者或者消费者引用RocketMQ客户端,在一个JVM中所有消费者,生产者都持有同一个实例,MQClientInstance只会启动一次。
RocketMQ默认提供5中分配算法
如果有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8),有3个消费者(c1,c2,c3)
- 平均分配(AllocateMessageQueueAveragely)
消费者 |
消息队列 |
c1 |
q1、q2、q3 |
c2 |
q4、q5、q6 |
c3 |
q7、q8 |
- 平均轮询分配(AllocateMessageQueueAveragelyByCircle)
消费者 |
消息队列 |
c1 |
q1、q4、q7 |
c2 |
q2、q5、q8 |
c3 |
q3、q6 |
一直性Hash(AllocateMessageQueueConsistentHash)
不推荐使用,因为消息队列负载均衡信息不容易跟踪。
根据配置(AllocateMessageQueueByConfig)
为每一个消费者配置固定的消费队列。
根据Broker部署机房名(AllocateMessageQueueByMachineRoom)
对每一个消费者负载不同Broker上的队列。一般尽量使用“平均分配”“平均轮询分配”,因为分配算法比较直观。无论哪种算法,遵循的原则是一个消费者可以分配多个消息队列,同一个消息队列只会分配一个消费者,所以如果消费者个数大于消息队列数量,则有些消费者无法消费消息。
RebalanceService每隔20S进行一次队列负载
每次进行队列重新负载时会查询出当前所有的消费者,并且对消息队列、消费者列表进行排序。因为在一个JVM中只会有一个pullRequestQueue对象。具体可见源码中PullMessageService类中代码实现。
代码示例:消费者-推模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| public class PullMessageService extends ServiceThread { private final InternalLogger log = ClientLogger.getLog(); private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); private final MQClientInstance mQClientFactory; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "PullMessageServiceScheduledThread"); } }); public PullMessageService(MQClientInstance mQClientFactory) { this.mQClientFactory = mQClientFactory; } public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } } public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } } public void executeTaskLater(final Runnable r, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } } public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } @Override public void shutdown(boolean interrupt) { super.shutdown(interrupt); ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS); } @Override public String getServiceName() { return PullMessageService.class.getSimpleName(); } }
|
消息确认(ACK)
PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class PushConsumerB { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("192.168.0.128:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("queueID:%d:%s:Messages:%s %n", msgs.get(0).getQueueId(),Thread.currentThread().getName(), new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("ConsumerPartOrder Started.%n"); } }
|
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费者的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。
消息ACK机制
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某些已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。
每次消息消费成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度;但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值。
这种方式和传统的一条message单独ack的方式有本质的区别,性能上提升的同时,会带来一个潜在的重复问题。由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。
消息进度存储
广播模式
同一个消费组的所有消费者都需要消费主题下的所有消息,因为消费者的行为都是独立的,互不影响,固消息进度需要独立存储,所以这种模式下消息进度存储在消费者本地。
集群模式
集群模式消息进度存储文件存放在服务器Broker上。
顺序消息
顺序消息(FIFO消息)是消息队列RocketMQ提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。顺序消息分为全局顺序消息和分区顺序消息。
全局顺序消息
RocketMQ在默认情况下不保证顺序,要保证全局顺序,需要把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。所以这样的话高并发,高吞吐量的功能完全用不上。
适用场景
适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
要确保全局顺序消息,需要先把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。
1
| mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876
|
在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
部分顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费;Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
延时消息
概念介绍
延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单,如已完成支付则忽略.
使用方式
Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销(阿里云RocketMQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。延迟消息是根据延迟队列的level来的,延迟队列默认是msg.setDelayTimeLevel(5)代表延迟一分钟。
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为18表示延迟2个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
代码示例-延时消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class TimerProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("sync"); producer.setNamesrvAddr("192.168.0.128:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("TopicTest" ,"TagB" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.setDelayTimeLevel(4); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
|
消息过滤
概念介绍
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是可以实现服务端的过滤。
表达式过滤
主要支持如下2种的过滤方式
Tag过滤方式
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG可以用||分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层——Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同则丢弃该消息,不进行消息消费。
QL92的过滤方式
这种方式的大致做法和上面的Tag过滤方式一样,只是具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
具体使用见 http://rocketmq.apache.org/docs/filter-by-sql92-example/
注意:如果开启SQL过滤的话,Broker需要开启参数enablePropertyFilter=true,然后服务器重启生效。
代码示例
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class SqlProducer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return; } for (int i = 0; i < 10; i++) { try { String tag; int div = i % 3; if (div == 0) { tag = "TagA"; } else if (div == 1) { tag = "TagB"; } else { tag = "TagC"; } Message msg = new Message("SQL", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } } producer.shutdown(); } }
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class SqlConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); try { consumer.subscribe("SQL",MessageSelector.bySql(" TAGS is not null and TAGS in ('TagA', 'TagB') ")); } catch (MQClientException e) { e.printStackTrace(); return; } consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("queueID:%d:%s:Messages:%s %n", msgs.get(0).getQueueId(), msgs.get(0).getTags(), new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); return; } System.out.printf("ConsumerPartOrder Started.%n"); } }
|
类过滤
新版本(>=4.3.0)已经不支持(新版本已经不支持了)
代码示例
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class FilterServerConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterServer"); consumer.setNamesrvAddr("192.168.0.128:9876"); consumer.setMessageModel(MessageModel.CLUSTERING); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
String filterCode = MixAll.file2String(classFile); System.out.println(filterCode); consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl", filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start();
System.out.printf("ConsumerPartOrder Started.%n"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public class FilterServerProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.0.128:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicFilter", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("SequenceId", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
|