抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ主从同步

概要

​ 高可用(HA 机制)特性是目前分布式系统中必备的特性之一,对一个中间件来说没有HA机制必然是一个重大的缺陷。RocketMQ的Broker 分为 Master (主)和 Slave(从)两个角色,为了保证高可用性,Master 角色的机器接收到消息后 ,要把内容同步到 Slave 机器上,这样一旦 Master 宕机,Slave 机 器依然可以提供服务。这个就是 RocketMQ 实现高可用(HA 机制)的原理。

​ 为了提高消息消费的高可用性,避免 Broker 发生单点故障引起存储在 Broker 上的消息无法及时消费, RocketMQ 引入了 Broker 主从机制:即消息 消费到达主服务器(Master)后需要消息同步到消息从服务器(Slave),如果主服务器 Broker 宕机后,消息消费者可以从从服务器拉取消息。

​ 同时 RocketMQ 依赖 NameServer, 所以为了确保高可用,同时要确保 NameServer 的高可用,一般通过部署多台 NamesServer 服务器来实现,但彼此 之间互不通信,也就是 NameServer 务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是 RocketMQ NameServer 设计的一个亮点。

RocketMQ集群部署模式

集群部署模式

单master模式

也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用

多master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

优点

​ 配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

缺点

​ 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会 对订阅该 topic 的应用造成影响。

多Master多Slave模式-异步复制

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备 模式。

优点

​ 即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,一般情况下都是 master 消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。

缺点

​ 使用异步复制的同步方式有可能会有消息丢失的问题。

多master多slave同步-双写模式

同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。

优点

​ 同步双写的同步模式能保证数据不丢失。,数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

缺点

​ 性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

同步方式

​ 同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)

刷盘策略

​ 同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储进入磁盘)

注意:对数据要求较高的场景,建议的持久化策略是主 broker 和从 broker 采用同步复制方式,主从 broker 都采用异步刷盘方式。通过同步复制方式,保存数据热备份,通过异步刷盘方式,保证 rocketMQ 高吞吐量。

安装部署过程

注意事项

注意,默认RocketMQ会吃8G,所以需要修改默认加载内存设置。

修改broker启动脚本runbroker.sh里面的jvm参数

JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”改为

JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m”

RocketMQ提供了初始的集群部署模式下的配置文件,如下图:

双主集群安装

服务器相关配置信息
NameServer集群

192.168.56.102

192.168.56.103

Broker服务器

192.168.56.102(主A)

192.168.56.103(主B)

注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-noslave/)需要修改(同时修改nameserver地址为集群地址):

配置文件配置

使用2m-noslave配置模板

broker-a.properties 配置

192.168.56.102(主A)的broker-a.properties 增加:

1
2
3
brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-b.properties

192.168.56.103 (主B)broker-b.properties 增加:

1
2
3
brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤

记得关闭防火墙或者要开通9876端口

启动NameServer集群

这里使用102和103两台作为集群即可

  1. 在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下:然后执行:

    1
    nohup sh mqnamesrv &
  2. 在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下:然后执行:

    1
    nohup sh mqnamesrv &
启动双主集群

顺序是先启动主,然后启动从

启动主A

启动主A:102服务器进入至MQ文件夹/bin下,执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):

1
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log

启动主B

启动主B:103服务器进入至MQ文件夹\bin下,执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-noslave/broker-b.properties  autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log
查看日志

每台服务器查看日志

1
tail -f ~/logs/rocketmqlogs/broker.log
启动控制台

如果是要启动控制台,则需要重新打包:

进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。

rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成。在把编译后的jar包丢上服务器:

启动命令

1
nohup java -jar rocketmq-console-ng-1.0.1.jar &

双主双从同步集群安装

服务器相关配置信息
NameServer集群

192.168.56.102

192.168.56.103

Broker服务器

192.168.56.102(主A)

192.168.56.103(主B)

192.168.56.104(从A)

192.168.56.105(从B)

注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver地址为集群地址):

配置文件配置

使用2m-2s-sync配置模板

broker-a.properties

192.168.56.102(主A)的broker-a.properties增加:

1
2
3
brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-b.properties

192.168.56.103(主B)的broker-b.properties增加:

1
2
3
brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-a-s.properties

192.168.56.104从A)的broker-a-s.properties增加:

1
2
3
brokerIP1=192.168.56.104

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-b-s.properties

192.168.56.105(从B)broker-b-s.properties增加:

1
2
3
brokerIP1=192.168.56.105

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤

记得关闭防火墙或者要开通9876端口

启动NameServer集群

这里使用102和103两台作为集群即可

启动机器A

在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下,然后执行:

1
nohup sh mqnamesrv &

启动机器B

在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下,然后执行:

1
nohup sh mqnamesrv &
启动双主双从同步集群

顺序是先启动主,然后启动从

启动主A

启动主A:,102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):

1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &

启动主B

启动主B,103服务器进入至MQ文件夹\bin下,执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &

启动从A

启动从A,104服务器进入至MQ文件夹\bin下,执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &

启动从B

启动从B,105服务器进入至MQ文件夹\bin下,执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &
查看日志

每台服务器查看日志:

1
tail -f ~/logs/rocketmqlogs/broker.log

双主双从异步集群安装

服务器相关配置信息
NameServer集群

192.168.56.102

192.168.56.103

Broker服务器

192.168.56.102(主A)

192.168.56.103(主B)

192.168.56.104(从A)

192.168.56.105(从B)

配置文件配置

使用2m-2s-async配置模板

注意:因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-async/)需要修改(同时修改nameserver地址为集群地址):

broker-a.properties

192.168.56.102(主A)的broker-a.properties增加:

1
2
3
brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-b.properties

192.168.56.103(主B)的broker-b.properties增加:

1
2
3
brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-a-s.properties

192.168.56.104(从A)的broker-a-s.properties增加:

1
2
3
brokerIP1=192.168.56.104

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
broker-b-s.properties

192.168.56.105(从B)的broker-b-s.properties增加:

1
2
3
brokerIP1=192.168.56.105

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤

记得关闭防火墙或者要开通9876端口

启动NameServer集群

这里使用102和103两台作为集群即可

启动机器A

在机器A,启动第1台NameServer: 102服务器进入至MQ文件夹/bin下,然后执行:

1
nohup sh mqnamesrv &

启动机器B

在机器B,启动第2台NameServer: 103服务器进入至MQ文件夹/bin下,然后执行:

1
nohup sh mqnamesrv &
启动双主双从同步集群

顺序是先启动主,然后启动从

启动主A

启动主A: 102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):

1
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true &

启动主B

启动主B:103服务器进入至MQ文件夹\bin下:执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties autoCreateTopicEnable=true &

启动从A

启动从A: 104服务器进入至MQ文件夹\bin下:执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties autoCreateTopicEnable=true &

启动从B

启动从B:105服务器进入至MQ文件夹\bin下:执行以下命令:

1
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties autoCreateTopicEnable=true &
查看日志

每台服务器查看日志

1
tail -f ~/logs/rocketmqlogs/broker.log

启动控制台

如果是要启动控制台,则需要重新打包:

进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。

例如:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true编译生成jar包,在把编译后的jar包丢上服务器:

启动命令
1
nohup java -jar rocketmq-console-ng-1.0.1.jar &

主从复制原理

详细参见主从复制原理

RocketMQ 主从同步(HA)实现过程如下:

  1. 主服务器启动,并在特定端口上监听从服务器的连接。

  2. 从服务器主动连接主服务器,主服务器接受客户端的连接,并建立相关 TCP 连接。

  3. 从服务器主动向服务器发送待拉取消息偏移 ,主服务器解析请求并返回消息给从服务器。

  4. 从服务器保存消息并继续发送新的消息同步请求

核心实现

​ 从服务器在启动的时候主动向主服务器建立 TCP 长连接,然后获取服务器的 commitlog 最大偏移,以此偏移向主服务器主动拉取消息,主服务器根 据偏移量,与自身 commitlog 文件的最大偏移进行比较,如果大于从服务器 commitlog 偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。

读写分离机制

​ RocketMQ 读写分离与他中间件的实现方式完全不同, RocketMQ 是消费者首先服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。

​ 那消息服务端是根据何种规则来建议哪个消息消费队列该从哪台 Broker 服务器上拉取消息呢?

​ 一般都是从主服务器拉取,如果主阶段拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时从从服务器拉取。

​ 如果主服务器繁忙则建议下 次从从服务器拉取消息,设置 suggestWhichBrokerld 配置文件中 whichBrokerWhenConsumeSlowly 属性,默认为 1。如 果一个 Master 拥有多台 Slave 服务器,参与消息拉取负载的从服务器只会是其中一个。

与Spring集成

pom文件

1
2
3
4
5
6
<!--RocketMQ-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

生产者

生产者配置信息

applicationContext.xml

1
2
3
4
5
<!-- 生产者配置 -->
<bean id="rocketMQProducer" class="com.chj.producer.RocketMQProducer" init-method="init" destroy-method="destroy">
    <property name="producerGroup" value="ProducerGroup" />
    <property name="namesrvAddr" value="192.168.0.128:9876" />
</bean>
生产者代码实现
发送入口代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

@Controller
@RequestMapping("/rocket")
public class RocketController {
   @Autowired
   @Qualifier("rocketMQProducer")
   private RocketMQProducer producer;
   /**
    * 消息发送
    */
   @ResponseBody
   @RequestMapping("spring")
   public String queueSender(@RequestParam("message")String message){
      String opt="";
      try {
         Message msg = new Message("rocket-spring-topic", "TAG1", message.getBytes());
         SendResult result = producer.getDefaultMQProducer().send(msg);
         if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){
            opt = "suc";
         }else{
            opt = "err";
         }

      } catch (Exception e) {
         opt = e.getCause().toString();
      }
      return opt;
   }
   @ResponseBody
   @RequestMapping("springb")
   public String topicSender(@RequestParam("message")String message){
      String opt = "";
      try {
         Message msg = new Message("rocket-spring-topic-b", "TAG1", message.getBytes());
         SendResult result = producer.getDefaultMQProducer().send(msg);
         System.out.println("SendStatus:"+result.getSendStatus());
         if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){
            opt = "suc";
         }else{
            opt = "err";
         }

      } catch (Exception e) {
         opt = e.getCause().toString();
      }
      return opt;
   }
}
生产者代码封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public class RocketMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);
    private DefaultMQProducer defaultMQProducer;
    private String producerGroup;
    private String namesrvAddr;
    public void init() throws MQClientException {
        this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
        defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
        defaultMQProducer.start();
        logger.info("rocketMQ初始化生产者完成[producerGroup:" + producerGroup + "]");
    }
    public void destroy() {
        defaultMQProducer.shutdown();
        logger.info("rocketMQ生产者[producerGroup: " + producerGroup + "]已停止");
    }
    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }
    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
}

消费者

消费者监听配置

applicationContext.xml中使用监听器的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!-- 消费者监听1 -->
<bean id="messageListeners" class="com.chj.listener.MessageListenerImpl"></bean>
<!-- 消费者监听2 -->
<bean id="bmessageListeners" class="com.chj.listener.BMessageListenerImpl"></bean>
<!-- 消费者配置1 -->
<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"
      init-method="start" destroy-method="shutdown">
    <property name="consumerGroup" value="ConsumerGroup" />
    <property name="namesrvAddr" value="192.168.0.128:9876" />
    <property name="messageListener" ref="messageListeners" />
    <property name="subscription">
        <map>
            <entry key="rocket-spring-topic" value="TAG1" />
        </map>
    </property>
</bean>
<!-- 消费者配置2 -->
<bean id="rocketmqConsumer2" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"
      init-method="start" destroy-method="shutdown">
    <property name="consumerGroup" value="ConsumerGroup2" />
    <property name="namesrvAddr" value="192.168.0.128:9876" />
    <property name="messageListener" ref="bmessageListeners" />
    <property name="subscription">
        <map>
            <entry key="rocket-spring-topic-b" value="TAG1" />
        </map>
    </property>
</bean>
消费者代码实现
监听消息代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public class MessageListenerImpl implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            try {
                System.out.println(">>>>" + new String(msg.getBody(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        // 如果没有异常会认为都成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

与SpringBoot集成

配置信息

跟Spring和原生非常类似,maven配置如下:

POM配置
1
2
3
4
5
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
application.properties配置
1
2
#============== rocket ===================
rocketmq.namesrvaddr = 127.0.0.1:9876

生产者代码实现

发送入口代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

@RestController
@RequestMapping("/rocket")
public class RocketController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private MQProducer mqProducer;
    @RequestMapping(value = "/send")
    public String sendrocket(@RequestParam(required = false) String data,@RequestParam(required = false) String tag) {
        try {
            logger.info("rocket的消息={}", data);
            mqProducer.sendMessage(data,"TopicTest", tag, null);
            return "发送rocket成功";
        } catch (Exception e) {
            logger.error("发送rocket异常:", e);
            return "发送rocket失败";
        }
    }
}
生产者代码封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
public class MQProducer {
    private  static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class);
    @Value("${rocketmq.namesrvaddr}")
    private String nameservAddr;
    private final DefaultMQProducer producer = new DefaultMQProducer("TestProducer");
    /*
    * 初始化
     */
    @PostConstruct
    public void  start(){
        try {
            LOGGER.info("MQ:启动生产者");
            producer.setNamesrvAddr(nameservAddr);
            producer.start();
        }catch (MQClientException e){
            LOGGER.error("MQ:启动生产者失败:{}-{}",e.getResponseCode(),e.getErrorMessage());
            throw  new RuntimeException(e.getErrorMessage(),e);
        }
    }
    /*
    *发送消息
     */
    public void sendMessage(String data,String topic,String tags,String keys){
        try {
            byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
            Message message = new Message(topic,tags,keys,messageBody);

            producer.send(message, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    LOGGER.info("MQ: 生产者发送消息{}",sendResult);
                }

                public void onException(Throwable e) {
                    LOGGER.error(e.getMessage(),e);
                }
            });

        }catch (Exception e){
            LOGGER.error(e.getMessage(),e);
        }
    }
    @PreDestroy
    public void stop(){
        if(producer !=null){
            producer.shutdown();
            LOGGER.info("MQ:关闭生产者");
        }
    }
}

消费者

消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

@Component
public class MQPushConsumer implements MessageListenerConcurrently {
    private  static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumer.class);
    @Value("${rocketmq.namesrvaddr}")
    private String nameservAddr;
    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
    /*
     * 初始化
     */
    @PostConstruct
    public void  start(){
        try {
            LOGGER.info("MQ:启动消费者");
            consumer.setNamesrvAddr(nameservAddr);
            //消息队列从头开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //集群消费模式
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest","*");
            //注册消息监听器
            consumer.registerMessageListener(this);
            consumer.start();
        }catch (MQClientException e){
            LOGGER.error("MQ:启动消费者失败:{}-{}",e.getResponseCode(),e.getErrorMessage());
            throw  new RuntimeException(e.getErrorMessage(),e);
        }
    }
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index =0;
        try {
            for(;index<msgs.size();index++){
                MessageExt msg = msgs.get(index);
                String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.printf("消费者监听: queueID:%d:Messages:%s %n",  msgs.get(index).getQueueId(),messageBody);
            }
        }catch (Exception e){
            LOGGER.error(e.getMessage(),e);
        }finally {
            if(index <msgs.size()){
                context.setAckIndex(index+1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    @PreDestroy
    public void stop(){
        if(consumer !=null){
            consumer.shutdown();
            LOGGER.error("MQ:关闭消费者");
        }
    }
}

评论