抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-请求/响应模式

概述

​ 如果你正在进行web服务编程,那么最常用的模式是请求-响应模式。这种模式总是由客户端发起,然后等待服务器端的响应。如果客户端想发送一些信息给服务器,或者客户端按照某些标准请求一些信息,那么这种模式非常适合。如果服务器想自己发送信息给客户端,那么这种模式就非常不适合。这时我们就必须依赖像长轮询或者web挂钩这样的对HTTP进行某种程度扩展的技巧了。

​ 而对消息系统来说,最常用的模式是发送-接收模式。生产者节点发布一条信息,接下来这条信息会被传送给消费节点。这儿没有纯粹的客户端或者服务器的概念;节点可以是生产者,也可以是消费者,或者二者兼有。当一个节点想发送一些信息给另一个节点或者相反,这种模式都运行的非常好,不过,如果一个节点想按照某些标准向另一个节点请求信息,那么这种模式就不是很适合。

请求/响应模式

​ 然而,这一切并不是完全做不到。我们可以模仿请求-应答模式:让客户端创建一个应答队列,这个队列存储客户端发送给服务器的查询消息的应答。客户端可以设置请求消息的reply_to属性字段为应答队列名。服务器检查reply_to子段,然后通过默认的整理中心把应答消息发布给应答队列,接着这个消息就由客户端接收。

​ 请求端的实现很简单;它看起来就像标准的发送-接收模式。而对于应答端,我们可以有多个选择。如果你通过谷歌搜寻”RabbitMQ RPC”或者”RabbitMQ request response”,你就发现有关应答队列的性质方面有几个不同的意见:

  • 每个请求是不是应该对应一个应答队列,或者客户端对多个请求是不是应该只维护一个应答队列?

  • 应答队列是不是应该是独享的,即只可用于一个通道,或者应该不是独享的?注意:当通道关闭的时候,独享队列应该删除,无论是有意,还是网络中断或者转发网关失效,都能引起连接丢失。

每个请求独享一个应答队列

​ 这种情况下,每个请求创建一个应答队列。好处是实现起来简单。把响应与请求关联起来也没有问题,因为每个请求都有它自己对应的响应消费者。如果客户端与代理之间的连接在响应接收之前就断开了,那么代理就会清除掉剩余的应答队列,这时就会丢失响应的消息。

​ 实现这个的主要问题是:倘若由于服务器问题引起服务器无法发布响应,那么我们必须清除所有的应答队列。

​ 这种方式有很大的性能开销,因为它对每个请求都要创建一个新的队列和消费者。

每个客户端独享一个应答队列

​ 这种情况下,每个客户端连接维护着由许多请求共享的应答队列。这减少了对每个请求都要创建队列和消费者所造成的性能开销,不过它增加了客户端需要跟踪应答队列并且把应答与各自对应的请求匹配方面的开销。处理这个的标准办法是使用关联id,服务器可以从响应所对应的请求里拷贝这个id。

​ 再次说明一下,在客户端断开的时候,删除应答队列没有任何问题,因为转发网关会自动删除队列的。然而,这确实意味着断开的那个时刻的仍在传输的任何响应都将丢失。

永久应答队列

​ 上面两种情形都存在这样的问题:如果客户端和转发网关之间的连接断开,并且响应还处在运行状态,那么这些响应就会丢失。这是因为它们使用的是独享型的队列,也就是说,当拥有这个队列的连接关闭的时候,转发网关必须删除这个队列。

​ 针对这个问题的常见的解决办法就是使用非独享型的应答队列。不过这会引起一些管理开销。你需要采用某种方式命名应答队列,并把它与特定的客户端关联起来。问题是:客户端很难知道一个应答队列是属于自己的,还是属于另一个客户端的。不过随意地创建一个可把响应发送给不对应的客户端这样的环境却非常容易。你可能最终要手工创建和命名响应队列,这么做就没有了第一种情况下可根据消息选择代理的好处了。

代码实现

抽象父类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public abstract class AbstractRabbitMQ {

//连接
private Connection connection;
//通道
private Channel channel;

private java.util.function.Consumer<String> publish;

public AbstractRabbitMQ() {
init();
}

//初始化方法
public void init() {
//初始化连接
initConnection();
//初始化通道
initChannel();
//通道配置
channelConfig(channel);
//初始化消费者
initConsumer(channel);
//初始化发布者
publish = initPublish(channel);
}

/**
* 初始化连接
*/
public void initConnection() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.134");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
this.connection = connection;
}

/**
* 初始化通道
*/
public void initChannel() {
if (null != connection) {
try {
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 发送消息
*
* @param message
*/
public void sendMessage(String message) {
if (null != publish) {
publish.accept(message);
}
}


/**
* 信道配置
*
* @param channel
*/
public abstract Channel channelConfig(Channel channel);

/**
* 初始化发布者
*
* @param channel
* @return
*/
public java.util.function.Consumer<String> initPublish(Channel channel) {
return null;
}

/**
* 初始化消费者
*
* @param channel
*/
public void initConsumer(Channel channel) {

}

/**
* 关闭
*/
public void close() {
if (null != channel && connection.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}

}
}
if (null != connection && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class RabbitMQProducer extends AbstractRabbitMQ {
public final static String EXCHANGE_NAME = "REPLY_TO_EXCHANGE";//topic交换器名称

private final static String requestQueueName = "requestQueue";

private final static String responseQueueName = "responseQueue";

/**
* 通道配置
*
* @param channel
* @return
*/
@Override
public Channel channelConfig(Channel channel) {
try {
//声明直接交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明request队列
channel.queueDeclare(requestQueueName, false, false, false, null);
//声明response队列
channel.queueDeclare(responseQueueName, false, false, false, null);
//绑定交换器
channel.queueBind(requestQueueName, EXCHANGE_NAME, "");
} catch (IOException e) {
e.printStackTrace();
}
//创建死信交换器
return channel;
}

/**
* 初始化生产者
*
* @param channel
* @return
*/
@Override
public Consumer<String> initPublish(Channel channel) {
try {
channel.queueDeclare().getQueue();
} catch (IOException e) {
e.printStackTrace();
}
return (message) -> {
//设置消息属性设置回复队列 responseQueue
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().replyTo(responseQueueName).build();
try {
channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes());
System.out.println("发送消息:" + message);
} catch (IOException e) {
e.printStackTrace();
}
};
}

/**
* 初始化消费者
*
* @param channel
*/
@Override
public void initConsumer(Channel channel) {
try {
System.out.println("开始监听消息....");
channel.basicConsume(responseQueueName, true, new com.rabbitmq.client.DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("接收到回复消息:" + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
}

}

public static void main(String[] args) {
RabbitMQProducer mqProducer = new RabbitMQProducer();
mqProducer.sendMessage("xxxxxxx");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class RabbitMQConsumer extends AbstractRabbitMQ {

public final static String EXCHANGE_NAME = "REPLY_TO_EXCHANGE";//topic交换器名称

private final static String requestQueueName = "requestQueue";

private final static String responseQueueName = "responseQueue";

/**
* 通道配置
*
* @param channel
* @return
*/
@Override
public Channel channelConfig(Channel channel) {


try {
//声明直接交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明request队列
channel.queueDeclare(requestQueueName, false, false, false, null);
//声明response队列
channel.queueDeclare(responseQueueName, false, false, false, null);
//绑定交换器
channel.queueBind(requestQueueName, EXCHANGE_NAME, "");
} catch (IOException e) {
e.printStackTrace();
}

return channel;
}

/**
* 初始化消费者
*
* @param channel
*/
@Override
public void initConsumer(Channel channel) {
try {
System.out.println("开始监听消息....");
channel.basicConsume(requestQueueName, true, new com.rabbitmq.client.DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("接收系统消息:" + msg);
//收到消息回复
channel.basicPublish("", properties.getReplyTo(), properties, "OK".getBytes());
}
});
} catch (IOException e) {
e.printStackTrace();
}

}

public static void main(String[] args) {
RabbitMQConsumer mqConsumer = new RabbitMQConsumer();
}
}

测试

生产者日志

1
2
3
开始监听消息....
发送消息:xxxxxxx
接收到回复消息:OK

消费者日志

1
2
开始监听消息....
接收系统消息:xxxxxxx

评论