RabbitMQ-请求/响应模式
概述
如果你正在进行web服务编程,那么最常用的模式是请求-响应模式。这种模式总是由客户端发起,然后等待服务器端的响应。如果客户端想发送一些信息给服务器,或者客户端按照某些标准请求一些信息,那么这种模式非常适合。如果服务器想自己发送信息给客户端,那么这种模式就非常不适合。这时我们就必须依赖像长轮询或者web挂钩这样的对HTTP进行某种程度扩展的技巧了。
而对消息系统来说,最常用的模式是发送-接收模式。生产者节点发布一条信息,接下来这条信息会被传送给消费节点。这儿没有纯粹的客户端或者服务器的概念;节点可以是生产者,也可以是消费者,或者二者兼有。当一个节点想发送一些信息给另一个节点或者相反,这种模式都运行的非常好,不过,如果一个节点想按照某些标准向另一个节点请求信息,那么这种模式就不是很适合。
请求/响应模式
然而,这一切并不是完全做不到。我们可以模仿请求-应答模式:让客户端创建一个应答队列,这个队列存储客户端发送给服务器的查询消息的应答。客户端可以设置请求消息的reply_to属性字段为应答队列名。服务器检查reply_to子段,然后通过默认的整理中心把应答消息发布给应答队列,接着这个消息就由客户端接收。
请求端的实现很简单;它看起来就像标准的发送-接收模式。而对于应答端,我们可以有多个选择。如果你通过谷歌搜寻”RabbitMQ RPC”或者”RabbitMQ request response”,你就发现有关应答队列的性质方面有几个不同的意见:
每个请求独享一个应答队列
这种情况下,每个请求创建一个应答队列。好处是实现起来简单。把响应与请求关联起来也没有问题,因为每个请求都有它自己对应的响应消费者。如果客户端与代理之间的连接在响应接收之前就断开了,那么代理就会清除掉剩余的应答队列,这时就会丢失响应的消息。
实现这个的主要问题是:倘若由于服务器问题引起服务器无法发布响应,那么我们必须清除所有的应答队列。
这种方式有很大的性能开销,因为它对每个请求都要创建一个新的队列和消费者。
每个客户端独享一个应答队列
这种情况下,每个客户端连接维护着由许多请求共享的应答队列。这减少了对每个请求都要创建队列和消费者所造成的性能开销,不过它增加了客户端需要跟踪应答队列并且把应答与各自对应的请求匹配方面的开销。处理这个的标准办法是使用关联id,服务器可以从响应所对应的请求里拷贝这个id。
再次说明一下,在客户端断开的时候,删除应答队列没有任何问题,因为转发网关会自动删除队列的。然而,这确实意味着断开的那个时刻的仍在传输的任何响应都将丢失。
永久应答队列
上面两种情形都存在这样的问题:如果客户端和转发网关之间的连接断开,并且响应还处在运行状态,那么这些响应就会丢失。这是因为它们使用的是独享型的队列,也就是说,当拥有这个队列的连接关闭的时候,转发网关必须删除这个队列。
针对这个问题的常见的解决办法就是使用非独享型的应答队列。不过这会引起一些管理开销。你需要采用某种方式命名应答队列,并把它与特定的客户端关联起来。问题是:客户端很难知道一个应答队列是属于自己的,还是属于另一个客户端的。不过随意地创建一个可把响应发送给不对应的客户端这样的环境却非常容易。你可能最终要手工创建和命名响应队列,这么做就没有了第一种情况下可根据消息选择代理的好处了。
代码实现
抽象父类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| public abstract class AbstractRabbitMQ {
private Connection connection; private Channel channel;
private java.util.function.Consumer<String> publish;
public AbstractRabbitMQ() { init(); }
public void init() { initConnection(); initChannel(); channelConfig(channel); initConsumer(channel); publish = initPublish(channel); }
public void initConnection() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.64.134"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setPort(5672); Connection connection = null; try { connection = connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } this.connection = connection; }
public void initChannel() { if (null != connection) { try { channel = connection.createChannel(); } catch (IOException e) { e.printStackTrace(); } } }
public void sendMessage(String message) { if (null != publish) { publish.accept(message); } }
public abstract Channel channelConfig(Channel channel);
public java.util.function.Consumer<String> initPublish(Channel channel) { return null; }
public void initConsumer(Channel channel) {
}
public void close() { if (null != channel && connection.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { connection.close(); } catch (IOException e) { e.printStackTrace(); }
} } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class RabbitMQProducer extends AbstractRabbitMQ { public final static String EXCHANGE_NAME = "REPLY_TO_EXCHANGE";
private final static String requestQueueName = "requestQueue";
private final static String responseQueueName = "responseQueue";
@Override public Channel channelConfig(Channel channel) { try { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(requestQueueName, false, false, false, null); channel.queueDeclare(responseQueueName, false, false, false, null); channel.queueBind(requestQueueName, EXCHANGE_NAME, ""); } catch (IOException e) { e.printStackTrace(); } return channel; }
@Override public Consumer<String> initPublish(Channel channel) { try { channel.queueDeclare().getQueue(); } catch (IOException e) { e.printStackTrace(); } return (message) -> { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().replyTo(responseQueueName).build(); try { channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes()); System.out.println("发送消息:" + message); } catch (IOException e) { e.printStackTrace(); } }; }
@Override public void initConsumer(Channel channel) { try { System.out.println("开始监听消息...."); channel.basicConsume(responseQueueName, true, new com.rabbitmq.client.DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("接收到回复消息:" + msg); } }); } catch (IOException e) { e.printStackTrace(); }
}
public static void main(String[] args) { RabbitMQProducer mqProducer = new RabbitMQProducer(); mqProducer.sendMessage("xxxxxxx"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class RabbitMQConsumer extends AbstractRabbitMQ {
public final static String EXCHANGE_NAME = "REPLY_TO_EXCHANGE";
private final static String requestQueueName = "requestQueue";
private final static String responseQueueName = "responseQueue";
@Override public Channel channelConfig(Channel channel) {
try { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(requestQueueName, false, false, false, null); channel.queueDeclare(responseQueueName, false, false, false, null); channel.queueBind(requestQueueName, EXCHANGE_NAME, ""); } catch (IOException e) { e.printStackTrace(); }
return channel; }
@Override public void initConsumer(Channel channel) { try { System.out.println("开始监听消息...."); channel.basicConsume(requestQueueName, true, new com.rabbitmq.client.DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("接收系统消息:" + msg); channel.basicPublish("", properties.getReplyTo(), properties, "OK".getBytes()); } }); } catch (IOException e) { e.printStackTrace(); }
}
public static void main(String[] args) { RabbitMQConsumer mqConsumer = new RabbitMQConsumer(); } }
|
测试
生产者日志
1 2 3
| 开始监听消息.... 发送消息:xxxxxxx 接收到回复消息:OK
|
消费者日志
1 2
| 开始监听消息.... 接收系统消息:xxxxxxx
|