抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ深入理解

消息生产者流程

生产者的流程主要讲述 DefaultMQProducer 类的具体实现。

消息发送的主要流程: 验证消息、 查找路由、 消息发送(包含异常机制)

验证消息

​ 主要是要求主题名称、消息体不能为空、消息长度不能等于 0,且不能超过消息的最大的长度 4M(生产者对象中配置maxMessageSize=1024*1024*4)

查找路由

​ 客户端(生产者)会缓存 topic 路由信息(如果是第一次发送消息,本地没有缓存,查询 NameServer 尝试获取),路由信息主要包含了消息队列(queue 相关信息)。

消息发送

​ 选择消息队列,发送消息,发送成功则返回。选择消息队列两种方式(一般有两种,这里不做详细讲解,后续做详细讲解)。

批量消息发送

注意单批次不能超过消息的最大的长度 4M(生产者对象中配置 maxMessageSize=1024*1024*4)

  • 批量消息要求必要具有统一Topic、相同消息配置
  • 不支持延时消息
  • 建议一个批量消息最好不要超过1MB大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SimpleBatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        //如果一次只发送不超过4M的消息,那么批处理很容易使用
        //同一批的消息应该有:相同的主题,相同的waitstoremsgok,不支持调度
        String topic = "TopicTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);
        System.out.printf("Batch over");
        producer.shutdown();
    }
}

消息重试机制

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。

这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //生产者实例化
        DefaultMQProducer producer = new DefaultMQProducer("async");
        //指定rocket服务器地址
        producer.setNamesrvAddr("192.168.0.128:9876");
        //启动实例
        producer.start();
        //发送异步失败时的重试次数(默认值)
        producer.setRetryTimesWhenSendAsyncFailed(2);
        int messageCount = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("TopicTest","TagC","OrderID"+index,
                        ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //生产者异步发送
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //Thread.sleep(1000);
        countDownLatch.await(100, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

​ 通过代码和源码,一般发送并忘记没有重试。注意重试的原则,一般会采用规避原则(规避原则就是上一次消息发送过程中发现错误,在某一段时间内,消息生产者不会选择该Broker上的消息队列,这样可以提高发送消息的成功率)。

​ 注意重试的原则,一般会采用规避原则(规避原则就是上一次消息发送过程中发现错误,在某一段时间内,消息生产者不会选择该 Broker 上的消息队列,这样可以提高发送消息的成功率)

规避原则

​ 注意了,这里我们发现,有可能在实际的生产过程中,我们的 RocketMQ 有几台服务器构成的集群

​ 其中有可能是一个主题 TopicA 中的 4 个队列分散在 Broker1、Broker2、Broker3 服务器上。

​ 如果这个时候 Broker2 挂了,我们知道,但是生产者不知道(因为生产者客户端每隔 30S 更新一次路由,但是 NamServer 与 Broker 之间的心跳检测间隔是 10S,所以生产者最快也需要 30S 才能感知 Broker2 挂了),所以发送到 queue2 的消息会失败,RocketMQ 发现这次消息发送失败后,就会将 Broker2排除在消息的选择范围,下次再次发送消息时就不会发送到 Broker2,这样做的目的就是为了提高发送消息的成功率。

顺序消息的重试

​ 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重 试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试

​ 对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

​ 无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

死信队列

​ 当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

消息模式

RocketMQ提供两个模式进行消费

拉模式(PULL)

代码上使用DefaultMQPullConsumer

  1. 获取MessageQueues并遍历(一个Topic包括多个MessageQueue),如果是特殊情况,也可以选择指定的MessageQueue来读取消息。

  2. 维护Offsetstore,从一个MessageQueue里拉取消息时,要传入Offset参数,随着不断的读取消息,Offset会不断增长。这个时候就需要用户把Offset存储起来,根据实际的情况存入内存、写入磁盘或者数据库中。

  3. 根据不同的消息状态做不同的处理。

​ 拉取消息的请求后,会返回:FOUND(获取到消息),NO_MATCHED_MSG(没有匹配的消息),NO_NEW_MSG(没有新消息),OFFSET_ILLEGAL(非法偏移量)四种状态,其中必要重要的是FOUND(获取到消息)和NO_NEW_MSG(没有新消息)。

总结:这种模式下用户需要自己处理Queue,并且自己保存偏移量,所以这种方式太过灵活,往往我们业务的关注重点不在内部消息的处理上,所以一般情况下我们会使用推模式。

代码示例:消费者-拉模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
    public static void main(String[] args) throws MQClientException {
        //拉模式
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullconsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //consumer.setBrokerSuspendMaxTimeMillis(1000);
        System.out.println("ms:"+consumer.getBrokerSuspendMaxTimeMillis());
        consumer.start();
        //1.获取MessageQueues并遍历(一个Topic包括多个MessageQueue  默认4个)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            System.out.println("queueID:"+ mq.getQueueId());
            //获取偏移量
            long Offset = consumer.fetchConsumeOffset(mq,true);
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) { //拉模式,必须无限循环
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq,null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n",pullResult);
                    //2.维护Offsetstore(这里存入一个Map)
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    //3.根据不同的消息状态做不同的处理
                    switch (pullResult.getPullStatus()) {
                        case FOUND: //获取到消息
                            for (int i=0;i<pullResult.getMsgFoundList().size();i++) {
                                System.out.printf("%s%n", new String(pullResult.getMsgFoundList().get(i).getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG: //没有匹配的消息
                            break;
                        case NO_NEW_MSG:  //没有新消息
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL: //非法偏移量
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }
}

推模式(PUSH)

代码上使用DefaultMQPushConsumer

​ Push方式是Server端接收到消息后,主动把消息推给Client端,实时性高,但是使用Push方式主动推送也存在一些问题:比如加大Server端的工作量,其次Client端的处理能力各不相同,如果Client不能及时处理Server推过来的消息,会造成各种潜在的问题。

长轮询

​ 所以RocketMQ使用“长轮询”的方式来解决以上问题,核心思想是这样,客户端还是拉取消息,Broker端HOLD住客户端发过来的请求一小段时间,在这个时间内(5s)有新消息达到,就利用现有的连接立刻返回消息给Consunmer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。因为长轮询方式的有局限性,是在HOLD住Consumer请求的时候需要占用资源,所以它适合在消息队列这种客户端连接数可控的场景中。

流量控制

​ Push模式基于拉取,消费者会判断获取但还未处理的消息个数、消息总大小、Offset的跨度3个维度来控制,如果任一值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。

​ 两种情况会限流,限流的做法是放弃本次拉取消息的动作,并且这个队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列。

  1. Push模式基于拉取,消费者会判断获取但还未处理的消息个数、消息总大小、Offset的跨度3个维度来控制,如果任一值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。

  2. 两种情况会限流,限流的做法是放弃本次拉取消息的动作,并且这个队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列。

消息队列负载与重新分布机制

​ 在集群消费模式中,往往会有很多个消费者,对应消费一个主题(topic),一个主题中有很多个消费者队列(queue),我们要考虑的问题是,集群内多个消费者是如何负载主题下的多个消费者队列,并且如果有新的消费者加入,消息队列又会如何重新分布。

​ 从源码的角度上看,RocketMQ消息队列重新分布是由RebalanceService线程来实现的,一个MQClientInstance持有一个RebalanceService实现,并且随着MQClientInstance的启动而启动。

备注:MQClientInstance是生产者和消费者中最大的一个实例,作为生产者或者消费者引用RocketMQ客户端,在一个JVM中所有消费者,生产者都持有同一个实例,MQClientInstance只会启动一次。

RocketMQ默认提供5中分配算法

如果有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8),有3个消费者(c1,c2,c3)

  • 平均分配(AllocateMessageQueueAveragely)
消费者 消息队列
c1 q1、q2、q3
c2 q4、q5、q6
c3 q7、q8
  • 平均轮询分配(AllocateMessageQueueAveragelyByCircle)
消费者 消息队列
c1 q1、q4、q7
c2 q2、q5、q8
c3 q3、q6
  • 一直性Hash(AllocateMessageQueueConsistentHash)

    不推荐使用,因为消息队列负载均衡信息不容易跟踪。

  • 根据配置(AllocateMessageQueueByConfig)

    为每一个消费者配置固定的消费队列。

  • 根据Broker部署机房名(AllocateMessageQueueByMachineRoom)

    ​ 对每一个消费者负载不同Broker上的队列。一般尽量使用“平均分配”“平均轮询分配”,因为分配算法比较直观。无论哪种算法,遵循的原则是一个消费者可以分配多个消息队列,同一个消息队列只会分配一个消费者,所以如果消费者个数大于消息队列数量,则有些消费者无法消费消息。

RebalanceService每隔20S进行一次队列负载

每次进行队列重新负载时会查询出当前所有的消费者,并且对消息队列、消费者列表进行排序。因为在一个JVM中只会有一个pullRequestQueue对象。具体可见源码中PullMessageService类中代码实现。

代码示例:消费者-推模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
    private final MQClientInstance mQClientFactory;
    private final ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "PullMessageServiceScheduledThread");
            }
        });
    public PullMessageService(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }
    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }
    public void executeTaskLater(final Runnable r, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }
    public ScheduledExecutorService getScheduledExecutorService() {
        return scheduledExecutorService;
    }
    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }
    @Override
    public void shutdown(boolean interrupt) {
        super.shutdown(interrupt);
        ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
    }
    @Override
    public String getServiceName() {
        return PullMessageService.class.getSimpleName();
    }
}
消息确认(ACK)

​ PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class PushConsumerB {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("192.168.0.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的地址
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("queueID:%d:%s:Messages:%s %n",  msgs.get(0).getQueueId(),Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("ConsumerPartOrder Started.%n");
    }
}

​ 如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字。

​ 为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费者的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。

消息ACK机制

​ RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某些已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

​ 每次消息消费成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度;但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值。

​ 这种方式和传统的一条message单独ack的方式有本质的区别,性能上提升的同时,会带来一个潜在的重复问题。由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

​ 在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

​ 在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
​ 对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

消息进度存储
广播模式

​ 同一个消费组的所有消费者都需要消费主题下的所有消息,因为消费者的行为都是独立的,互不影响,固消息进度需要独立存储,所以这种模式下消息进度存储在消费者本地。

集群模式

​ 集群模式消息进度存储文件存放在服务器Broker上。

顺序消息

​ 顺序消息(FIFO消息)是消息队列RocketMQ提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。顺序消息分为全局顺序消息和分区顺序消息。

全局顺序消息

​ RocketMQ在默认情况下不保证顺序,要保证全局顺序,需要把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。所以这样的话高并发,高吞吐量的功能完全用不上。

适用场景

适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

​ 要确保全局顺序消息,需要先把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。

1
mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876

​ 在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。

部分顺序消息

​ 对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费;Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

延时消息

概念介绍

延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

适用场景

​ 消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单,如已完成支付则忽略.

使用方式

​ Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销(阿里云RocketMQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)。
​ 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。延迟消息是根据延迟队列的level来的,延迟队列默认是msg.setDelayTimeLevel(5)代表延迟一分钟。

​ “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为18表示延迟2个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

代码示例-延时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TimerProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("sync");
        producer.setNamesrvAddr("192.168.0.128:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest" ,"TagB" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 这个是设置延时消息的属性
            //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"  18个等级
            msg.setDelayTimeLevel(4);
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消息过滤

概念介绍

​ RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是可以实现服务端的过滤。

表达式过滤

主要支持如下2种的过滤方式

Tag过滤方式

​ Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG可以用||分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层——Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同则丢弃该消息,不进行消息消费。

QL92的过滤方式

​ 这种方式的大致做法和上面的Tag过滤方式一样,只是具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。

具体使用见 http://rocketmq.apache.org/docs/filter-by-sql92-example/

注意:如果开启SQL过滤的话,Broker需要开启参数enablePropertyFilter=true,然后服务器重启生效。

代码示例
生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SqlProducer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }
        for (int i = 0; i < 10; i++) {
            try {
                String tag;
                int div = i % 3;
                if (div == 0) {
                    tag = "TagA";
                } else if (div == 1) {
                    tag = "TagB";
                } else {
                    tag = "TagC";
                }
                Message msg = new Message("SQL", tag,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("a", String.valueOf(i));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        producer.shutdown();
    }
}
消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SqlConsumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        try {
            consumer.subscribe("SQL",MessageSelector.bySql(" TAGS is not null and TAGS in ('TagA', 'TagB')  "));
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("queueID:%d:%s:Messages:%s %n",  msgs.get(0).getQueueId(),
                        msgs.get(0).getTags(), new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }
        System.out.printf("ConsumerPartOrder Started.%n");
    }
}
类过滤

​ 新版本(>=4.3.0)已经不支持(新版本已经不支持了)

代码示例

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 消费者-Filter过滤 (这个新版本淘汰了)
*/
public class FilterServerConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterServer");
consumer.setNamesrvAddr("192.168.0.128:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());


String filterCode = MixAll.file2String(classFile);
System.out.println(filterCode);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
filterCode);

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.printf("ConsumerPartOrder Started.%n");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 生产者-Filter过滤(这个新版本淘汰了)
*/
public class FilterServerProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.0.128:9876");
producer.start();
try {
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicFilter",
"TagA",
"OrderID001",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

msg.putUserProperty("SequenceId", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

评论