抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-消息发布均衡

发布均衡

​ 在 RabbitMQ 在设计的时候,特意让生产者和消费者“脱钩”,也就是消息的发布和消息的消费之间是解耦的

​ 在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。

​ 在 RabbitMQ 中实际项目中,生产者和消费者都是客户端,它们都可以完成申明交换器、申明队列和绑定关系,但是在我们的实战过程中,我们在生 产者代码中申明交换器,在消费者代码中申明队列和绑定关系。

​ 另外还要申明的就是,生产者发布消息时不一定非得需要消费者,对于 RabbitMQ 来说,如果是单纯的生产者你只需要生产者客户端、申明交换器、 申明队列、确定绑定关系,数据就能从生产者发送至 RabbitMQ。只是为了演示的方便,我们在例子中使用消费者消费队列中的数据来方便展示结果。

无保障

​ 在演示各种交换器中使用的就是无保障的方式,通过 basicPublish 发布你的消息并使用正确的交换器和路由信息,你的消息会被接收并发送到合适的队列中。但是如果有网络问题,或者消息不可路由,或者 RabbitMQ 自身有问题的话,这种方式就有风险。所以无保证的消息发送一般情况下不推荐。

失败确认

​ 不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。

​ 那么怎么保证我们消息发布的可靠性?这里我们就可以启动失败确认,在发送消息时设置mandatory标志,即可开启故障检测模式。

​ 注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

代码演示

下面我们就来看看实际的代码实现,首先我们需要设置上述所说的mandatory标志,我们在发送消息时,再增加一个参数即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//添加失败消息监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode + ",replyText: " + replyText + ",exchange: " + exchange + ",routingKey: " + routingKey + ",message: " + new String(body));
}
});
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes("UTF-8"));

System.out.println("sendMessage:" + key + "===" + message);
}
//休眠2秒用于接受失败确认的消息
Thread.sleep(1000);
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

​ 上述设置好mandatory标志后,然后我们就准备一个不存在的路由键发送消息(路由键名称可自行填写,只要未被消费者使用即可),另外这里注意简单的设置了一个睡眠2秒用于接受失败确认的消息。

失败者监听配置

失败确认设置完成了,下面的代码是为了添加失败消息的监听

1
2
3
4
5
6
7
8
//添加失败消息监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode + ",replyText: " + replyText + ",exchange: " + exchange + ",routingKey: " + routingKey + ",message: " + new String(body));
}
});

运行测试

我们发送消息发送了 key1,key2,key3,但是消费者 只消费了key1,原来的情况向 key2,key3 的消息被直接丢弃了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息1
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息2
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息4
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息5
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息7
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息8

其他失败通知

上述我们使用接受失败确认消息采用了监听器的方式,再RabbitMQ中还提供了其他的一些监听器

信道关闭时触发
1
2
3
4
5
6
channel.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println(cause.getMessage());
}
});
连接关闭时触发
1
2
3
4
5
6
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println(cause.getMessage());
}
});

事务消息

​ amqp协议提供的一种保证消息成功投递的方式通过将信道开启 transactional 模式并利用信道 Channel 的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递

使用步骤

事务的实现主要是对信道(Channel)的设置,主要分为启动事务、提交事务、回滚事务。

其主要的方法有三个:

  1. channel.txSelect() 声明启动事务模式;
  2. channel.txComment() 提交事务;
  3. channel.txRollback() 回滚事务;

原理

发送消息之前,需要声明channel为事务模式,开启事务后,客户端和RabbitMQ之间的通讯交互流程:

  • 客户端发送给服务器 tx.select (开启事务模式)
  • 服务器端返回 tx.select-ok(开启事务模式ok)
  • 推送消息
  • 客户端发送给事务提交 tx.commit
  • 服务器端返回 tx.commit-ok

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.132");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//添加失败消息监听

//启用事务消息
channel.txSelect();

//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息
try {
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
//等待消息发送状态
System.out.println("sendMessage:" + key + "===" + message);
//模拟发送出现异常事务回滚
//int result = 1 / 0;
}
//提交事务
channel.txCommit();
} catch (Exception e) {
//回滚事务
channel.txRollback();
}

//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

​ 以上就完成了事务的交互流程,如果其中任意一个环节出现问题,就会抛出IOException异常,这样用户就可以拦截异常进行事务回滚,或决定要不要重复消息。

注意

​ 不过需要注意的是,一般我们很少会使用到RabbitMQ的事务,因为AMQP协议层面虽然为我们提供了事务机制,但是事务机制本身也会带来问题:

  • 严重的性能问题
  • 使生产者应用程序产生同步

特别是其性能问题,根据相关资料,RabbitMQ事务会降低2~10倍的性能。

发送方确认

​ 我们提到失败确认只会让RabbitMQ向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

​ 那么我们如何能够进一步的来保证其消息的可靠性呢?这里我们就在想,要是RabbitMQ不仅仅在路由失败的时候给我们发送消息,并且能够在消息路由成功的时候也给我们发送消息就好了,这里RabbitMQ就为我们提供了该方案,即发送方确认模式

​ 基于事务的性能问题,RabbitMQ 团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计

注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。

原理

​ 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),由这个id在生产者和RabbitMQ之间进行消息的确认。

不可路由

发送方确认模式需要分两种情况下列来看,首先我们先来看一看消息不可路由的情况

​ 首先我们都知道,消息不可路由时,就不存在路由到队列了,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认操作。

​ 另外在这过程中,生产者接受到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,所以一般情况下这里会结合我们在RabbitMQ之失败确认中介绍过失败确认模式,这里一般会进行设置mandatory模式,失败则会调用addReturnListener监听器来进行处理。

可以路由

发送方确认模式的另一种情况肯定就是消息可以进行路由

​ 可路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

​ 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

确认模式共有三种方式

​ confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最 终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。

普通发送方确认模式

channel.waitForConfirms()普通发送方确认模式;每条消息都需要进行逐条确认,当消息到达交换器,就会返回true

代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//启用发送者确认模式
channel.confirmSelect();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//添加失败消息监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode + ",replyText: " + replyText + ",exchange: " + exchange + ",routingKey: " + routingKey + ",message: " + new String(body));
}
});
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes("UTF-8"));
//等待消息发送状态
boolean isSend = channel.waitForConfirms();
String sendText = "";
if (isSend) {
sendText = "发送成功";
} else {
sendText = "发送失败";
}
System.out.println("sendMessage:" + key + "===" + message + "," + sendText);
}
//休眠2秒用于接受失败确认的消息
Thread.sleep(2000);
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}
测试

这里我们就在失败确认上的代码基础上进行修改了下,如上添加了发送者确认,测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sendMessage:key1===hello 发送rabitmq消息0,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息1
sendMessage:key2===hello 发送rabitmq消息1,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息2
sendMessage:key3===hello 发送rabitmq消息2,发送成功
sendMessage:key1===hello 发送rabitmq消息3,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息4
sendMessage:key2===hello 发送rabitmq消息4,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息5
sendMessage:key3===hello 发送rabitmq消息5,发送成功
sendMessage:key1===hello 发送rabitmq消息6,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key2,message: hello 发送rabitmq消息7
sendMessage:key2===hello 发送rabitmq消息7,发送成功
replyCode: 312,replyText: NO_ROUTE,exchange: direct_exchange,routingKey: key3,message: hello 发送rabitmq消息8
sendMessage:key3===hello 发送rabitmq消息8,发送成功
sendMessage:key1===hello 发送rabitmq消息9,发送成功
批量确认模式

使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出IOException异常。

代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//启用发送者确认模式
channel.confirmSelect();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//添加失败消息监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode + ",replyText: " + replyText + ",exchange: " + exchange + ",routingKey: " + routingKey + ",message: " + new String(body));
}
});
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes("UTF-8"));
//等待消息发送状态
System.out.println("sendMessage:" + key + "===" + message);
}
try {
channel.waitForConfirmsOrDie();
} catch (Exception e) {
System.out.println("批量确认消息失败,errorMessage:" + e.getMessage());
}
//休眠2秒用于接受失败确认的消息
Thread.sleep(2000);
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

这里我们就可以在catch中进行一些业务处理,来处理消息发送失败后的动作。

异步监听发送方确认模式

channel.addConfirmListener()异步监听发送方确认模式

代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//添加失败消息监听

//启用发送者确认模式
channel.confirmSelect();
//添加消息发送确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}
});
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes("UTF-8"));
//等待消息发送状态
System.out.println("sendMessage:" + key + "===" + message);
}
//休眠2秒用于接受失败确认的消息
Thread.sleep(2000);
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

​ 上述添加的ConfirmListener中,我们实现两个方法,一个是发送成功的ack方法,另一个是发送失败的nack方法,其中都有两个参数deliveryTagmultiple,其中第一个表示消息的ID,由这个id在生产者和RabbitMQ之间进行消息的确认;第二个表示是否是批量确认。

测试

​ 在异步监听发送方确认模式下,将有RabbitMQ帮我们来决定是否批量发送,这里我们就是通过multiple来判断的,测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
消息发送成功,deliveryTag:2,multiple:false
消息发送成功,deliveryTag:3,multiple:false
消息发送成功,deliveryTag:1,multiple:false
消息发送成功,deliveryTag:5,multiple:false
消息发送成功,deliveryTag:6,multiple:false
消息发送成功,deliveryTag:8,multiple:false
消息发送成功,deliveryTag:9,multiple:false
消息发送成功,deliveryTag:10,multiple:true

从上述结果来看,我们共发送了9条消息,第一条消息(ID=1),multiple为false,是一条消息单独发送确认的。
而第二、三条消息时批量确认的,deliveryTag返回的是最后一条消息的ID,multiple表示是多条消息批量确认。
这个结果我们多运行几次,发现是又会不同的表现的,这里就有RabbitMQ为我们进行判断。

备用交换器

​ 当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。

​ 在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。

生产者配置

这里我们直接用代码来看,首先我们先来看看消息的生产者,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static String BACKUP_EXCHANGE_NAME = "backup_exchange";//备用交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.132");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();

//在信道中设置备用交换器
channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map map = new HashMap<>();
map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);

//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, map);


//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));

System.out.println("sendMessage:" + key + "===" + message);
}


//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

主消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static String BACKUP_EXCHANGE_NAME = "backup_exchange";//备用交换器名称
public final static String DIRECT_QUEUE = "direct_queue";

private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.132");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});


}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
Map map = new HashMap<>();
map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, map);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
String[] keys = new String[]{"key1"};
for (String key : keys) {
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, key);
}
System.out.println("主交换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("主交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:" + Thread.currentThread().getId());

}
};
//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, true, consumer);
}
}
注意

生产者和消费者的属性配置必须一致,需要将生产者的配置复制过来,否则将会出现如下错误

1
2
3
4
Map map = new HashMap<>();
map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, map);

还有一种办法就是将声明交换器的代码注释掉

1
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, map);

​ 一般就索性直接将其注释,不创建交换器了,但是这里注释掉也会有点小问题,就是我们在第一次如果先启动该消费者也会报错,需要先启动下的消息生产者才可,因为我们需要创建交换器呀。

备用交换器配置

在备用交换器中,我们一般定义为FANOUT模式,然后定义了一个路由键#,让其消费者来接受这些无法路由的消息进行相关的处理。

​ 我们消息生产者发送了key1、key2、key3三种消息,但是消息的消费者只能消费key1消息,那么其key2、key3这两个消息主交换器就无法进行路由,那么就会被路由到备用交换器,其备用交换器如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class BackUpConsumer {
public final static String BACKUP_EXCHANGE_NAME = "backup_exchange";//备用交换器名称
public final static String BACKUP_QUEUE = "backup_queue";

private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.132");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});


}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueDeclare(BACKUP_QUEUE, false, false, false, null);

channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE_NAME, "#");

System.out.println("备用交换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("备用交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:" + Thread.currentThread().getId());

}
};
//消费者在指定的对队列上消费
channel.basicConsume(BACKUP_QUEUE, true, consumer);
}
}

测试

生产者
1
2
3
4
5
6
7
8
9
10
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
主消费者
1
2
3
4
主交换器 Received:key1========hello 发送rabitmq消息0,ThreadId:20
主交换器 Received:key1========hello 发送rabitmq消息3,ThreadId:21
主交换器 Received:key1========hello 发送rabitmq消息6,ThreadId:21
主交换器 Received:key1========hello 发送rabitmq消息9,ThreadId:21
备用消费者
1
2
3
4
5
6
备用交换器 Received:key2========hello 发送rabitmq消息1,ThreadId:19
备用交换器 Received:key3========hello 发送rabitmq消息2,ThreadId:19
备用交换器 Received:key2========hello 发送rabitmq消息4,ThreadId:20
备用交换器 Received:key3========hello 发送rabitmq消息5,ThreadId:20
备用交换器 Received:key2========hello 发送rabitmq消息7,ThreadId:20
备用交换器 Received:key3========hello 发送rabitmq消息8,ThreadId:20

评论