消息队列——RabbitMQ
<h2><strong>1. 写在前面</strong></h2> <p>昨天简单学习了一个 <strong>消息队列</strong> 项目—— <strong>RabbitMQ</strong> ,今天趁热打铁,将学到的东西记录下来。</p> <p>学习的资料主要是官网给出的6个基本的消息发送/接收 模型 ,或者称为6种不同的使用场景,本文便是对这6种模型加以叙述。</p> <h2><strong>2. Tutorials</strong></h2> <p>在学习6种模型之前,我们首先需要安装RabbitMQ。RabbitMQ支持多种系统平台,各平台的安装方法。安装好之后,我们使用如下命令启用Web端的管理插件: rabbitmq-plugins enable rabbitmq_management ,然后启动RabbitMQ。接着用浏览器访问 http://localhost:15672/ ,若能看到RabbitMQ相关Web页面,说明启动成功。</p> <h3><strong>2.1 Hello World</strong></h3> <p>正所谓万事开头难,我们先从最简单的 <strong>Hello World</strong> 开始。首先当然是新建一个项目,导入RabiitMQ相关jar。我采用Maven来构建项目,因此只需要在pom文件中添加如下依赖:</p> <pre> <code class="language-dust"><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency></code></pre> <p>接下来学习最简单的消息队列模型,如下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/ec1c1ed3525c8971c26f12620578e287.png"></p> <p>在图中,P代表 producer,它是消息的 <strong>生产者</strong>;C代表 consumer ,它是消息的 <strong>消费者</strong> ;而红色的矩形正是我们所谓的 <strong>消息队列</strong> ,它位于 RabbitMQ 中( RabbitMQ 中可以有很多这样的队列,并且每个队列都有一个唯一的名字)。生产者(们)可以将消息发送到消息队列中,消费者(们)可以从消息队列中取出消息。</p> <p>这种模型是不是很简单呢?下面我们使用 <strong>Java</strong> ,借助于RabbitMQ来实现这种模型的消息通信。</p> <p>首先我们介绍如何 send 消息到消息队列。 send 之前,当然是和RabbitMQ服务器建立连接:</p> <pre> <code class="language-dust">ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection();</code></pre> <p>接下来我们创建一个 channel ,大多数API都是通过这个对象来调用的:</p> <pre> <code class="language-dust">Channel channel = connection.createChannel();</code></pre> <p>之后,我们便可以调用 channel 的如下方法去声明一个队列:</p> <pre> <code class="language-dust">channel.queueDeclare("hello", false, false, false, null);</code></pre> <p>该方法的第一个参数是队列的名称,其余的参数先不管,之后会介绍。我们可以尝试着去执行以上的5行代码,然后打开Web端,可以看到新建了一个叫作 hello 的队列:</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/665fba8e97cce8c23d613ee73c687ce8.png"></p> <p>有了队列,我们便可以向其中发送消息了,同样还是调用 channel 对象的API:</p> <pre> <code class="language-dust">channel.basicPublish("", "hello", null, "Hello World".getBytes());</code></pre> <p>以上代码所做的事情就是发送了一条字符串消息“Hello World”(第4个参数)到消息队列。你可能注意到我们调用了String对象的 getBytes 方法,没错,我们发送的实际上二进制数据。因此,理论上你能够发送任何数据到消息队列中,而不仅仅是文本信息。</p> <p>第2个参数叫做 <strong>路由键(routingKey)</strong> ,在该模型下必须与队列名相同,至于为什么,和其他参数一样,之后会了解到。</p> <p>我们可以修改发送的文本,再次执行上述代码,然后打开Web端查看,便可以查看到我们发送的消息:</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/5179cbeaa84d39c7b98d0d717b59f1f9.png"></p> <p>点击上图的 <strong>name</strong> 字段下的 <strong>hello</strong> ,可以查看 hello 队列中的具体信息:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/909a48393b1a4b18f5e672dc05a4421d.png"></p> <p>接下来,我们去尝试着去获取 <strong>生产者</strong> 发送的消息,和 send 方法一样,我们同样需要连接服务器,创建 channel ,声明队列:</p> <pre> <code class="language-dust">ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null);</code></pre> <p>之后我们可以调用 channel 的相关方法去监听队列,接收消息:</p> <pre> <code class="language-dust">channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } });</code></pre> <p>以上 basicConsume 方法中,第一个参数是队列的名字;第二个参数表示是否自动确认消息的接收情况,我们使用true,自动确认;第三个参数需要传入一个实现了 Consumer 接口的对象,我们简单的 new 一个默认的 Consumer 的实现类 DefaultConsumer ,然后在 handleDelivery 方法中去处理接收到的消息( handleDelivery 方法会在接收到消息时被回调)。</p> <p>运行以上代码,我们可以打印出之前向队列中 send 的数据:</p> <pre> <code class="language-dust">Hello World Hello World2</code></pre> <p>下面是 <strong>Hello World</strong> 的完整代码:</p> <pre> <code class="language-dust">public class App { @Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicPublish("", "hello", null, "Hello World2".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this){ // 因为以上接收消息的方法是异步的(非阻塞),当采用单元测试方式执行该方法时,程序会在打印消息前结束,因此使用wait来防止程序提前终止。若使用main方法执行,则不需要担心该问题。 wait(); } } }</code></pre> <h3><strong>2.2 Work queues</strong></h3> <p>接下来我们学习第二种模型—— <strong>Work Queues</strong> 。顾名思义,这种模型描述的是一个生产者(Boss)向队列发消息(任务),多个消费者(worker)从队列接受消息(任务),如下图所示:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/0fde8ffebb772c7d486921464ae55a29.png"></p> <p>下面我们用代码去实现。先是生产者 send 消息到队列,这次我们多发送些数据:</p> <pre> <code class="language-dust">@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); for (int i = 0; i < 9; i++) { channel.basicPublish("", "hello", null, String.valueOf(i).getBytes()); } channel.close(); connection.close(); }</code></pre> <p>然后是消费者接收数据:</p> <pre> <code class="language-dust">@Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); try { // Thread.sleep(1000); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); synchronized (this) { wait(); } }</code></pre> <p>代码基本上和 Hello World 的代码一样,只是加上句 sleep 来模拟消费者(worker)处理消息所花的时间。</p> <p>我们可以先执行三次 receive 方法(修改 sleep 的时间,其中消费者1 sleep 10s,消费者2,3 sleep 1s),让三个消费者(worker)一起等待消息的到来,然后执行 send 方法发送9条消息,观察三个消费者收到的消息情况。</p> <p>若不出意外,你会看到如下的打印结果:</p> <pre> <code class="language-dust">// 消费者1 0 // 10s 后 3 // 10s 后 6 // 消费者2 1 // 1s 后 4 // 1s 后 7 // 消费者3 2 // 1s 后 5 // 1s 后 8</code></pre> <p>通过打印结果,我们可以总结出 <strong>Work queues</strong> 的几个特点:</p> <ol> <li>一条消息只会被一个消费者接收;</li> <li>消息是平均分配给消费者的;</li> <li>消费者只有在处理完某条消息后,才会收到下一条消息。</li> </ol> <p>Work queues(Task Queuess)的概念在一些Web场景的应用中是很有用的,比如我们能够用它来构建一个master-slave结构的分布式爬虫系统:系统中有一个master节点和多个slave节点,master节点负责向各个slave节点分配爬取任务。</p> <h3><strong>2.3 Publish/Subscribe</strong></h3> <p>但有些时候,我们可能希望一条消息能够被多个消费者接受到,比如一些公告信息等,这时候用 <strong>Work Queue</strong> 模型显然不合适,而 <strong>Publish/Subscribe</strong> 模型正是对应这种使用场景的。</p> <p>在介绍Publish/Subscribe之前,我们快速回顾之前的两个模型,它们好像都是生产者将消息直接发送到消息队列,但其实不是这样的,甚至有可能生产者根本就不知道消息发送到了哪一个消息队列。</p> <p>先别着急,下面我们完整地介绍RabbitMQ消息发送/接受的方式。</p> <p>事实上,生产者是把消息发送到了 <strong>交换机(exchange)</strong> 中,然后交换机负责(决定)将消息发送到(哪一个)消息队列中。其模型如下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/78834739c797fabd64b5dbec0863752b.jpg"></p> <p>这时候你可能会疑惑:既然消息是被发送到了交换机中,那我们之前发送的消息是被发送到了哪一个交换机中了?它有没有机制能够让特定的消息发送到指定的队列?</p> <p>先回答第一个问题。还记得我们在 <strong>Hello World</strong> 中写的发送消息的代码吗?</p> <pre> <code class="language-dust">channel.basicPublish("", "hello", null, message.getBytes());</code></pre> <p>事实上第一个参数便是指定交换机的名字,即指定消息被发送到哪一个交换机。空字符串表示 <strong>默认交换机(Default Exchange)</strong> ,即我们之前发送的消息都是先发送到 <strong>默认交换机</strong> ,然后它再路由到相应的队列中。其实我们可以通过Web页面去查看所有存在的交换机:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/e559a4c7b1fe908d341347bb43a1b657.png"></p> <p>接着回答第二个问题。路由的依据便是通过第二个参数—— <strong>路由键(routing key)</strong> 指定的,之前已经提到过。在之前代码中,我们指定第二个参数为"hello",便是指定消息应该被交换机路由到路由键为hello的队列中。而 <strong>默认交换机(Default Exchange)</strong> 有一个非常有用的性质:</p> <p>每一个被创建的队列都会被自动的绑定到默认交换机上,并且路由键就是队列的名字。</p> <p>交换机还有4种不同的类型,分别是 direct , fanout , topic , headers ,每种类型路由的策略不同。</p> <p>direct 类型的交换机要求和它绑定的队列带有一个路由键K,若有一个带有路由键R的消息到达了交换机,交换机会将此消息路由到路由键K = R的队列。默认交换机便是该类型。因此,在下图中,消息会沿着绿色箭头路由:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/a84fa50921a68eb0a2fb6217d234340a.jpg"></p> <p>fanout 类型的交换机会路由每一条消息到所有和它绑定的队列,忽略路由键。</p> <p>剩下的两种类型之后再做介绍。</p> <p>在以上概念基础上,我们来看第3种消息模型: <strong>Publish/Subscribe</strong> 。如下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/59a7bcc69b4bf5388ff28f0790ca5b90.png"></p> <p>该模型是要让所有的消费者都能够接收到每一条消息。显然, fanout 类型的交换机更符合我们当前的需求。为此,先创建一个 fanout 类型的交换机。</p> <pre> <code class="language-dust">channel.exchangeDeclare("notice", "fanout");</code></pre> <p>其中,第一个参数是交换机的名称;第二个参数是交换机的类型。</p> <p>然后我们可以 send 消息了:</p> <pre> <code class="language-dust">channel.basicPublish( "notice", "", null, message.getBytes());</code></pre> <p>对于消费者,我们需要为每一个消费者创建一个独立的队列,然后将队列绑定到刚才指定的交换机上即可:</p> <pre> <code class="language-dust">// 该方法会创建一个名称随机的临时队列 String queueName = channel.queueDeclare().getQueue(); // 将队列绑定到指定的交换机("notice")上 channel.queueBind(queueName, "notice", "");</code></pre> <p>以下完整的代码:</p> <pre> <code class="language-dust">@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice", "fanout"); channel.basicPublish( "notice", "", null, "Hello China".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "notice", ""); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }</code></pre> <p>首先运行两次 receive 方法,让两个消费者等待接收消息,然后可以在Web端查看此时的队列情况,如下图所示:</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/b4f653f0ac8d6bd90784652ecd3f906a.png"></p> <p>可以看到图中有两个名称随机的队列。接着运行 send 方法发送一条消息,最终我们会看到两个消费者都打印出了 Hello China 。然后停止虚拟机让消费者断开连接,再次在Web端查看队列情况,会发现刚才的两个队列被自动删除了。</p> <h3><strong>2.4 Routing</strong></h3> <p>以上是Publish/Subscribe模式,它已经能让我们的通知(notice)系统正常运转了。现在再考虑这样一个新需求:对于一些机密通知,我们只想让部分人看到。这就要求交换机对绑定在其上的队列进行筛选,于是引出了又一个新的模型: <strong>Routing</strong> 。</p> <p>之前我们说过,对于 direct 类型的交换机,它会根据 <strong>routing key</strong> 进行路由,因此我们可以借助它来实现我们的需求,模型结构如下图:</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/05d48de4439d8e1a8d1778295a308652.jpg"></p> <p>下面用代码来实现。先看生产者。</p> <p>首先要声明一个 direct 类型的交换机:</p> <pre> <code class="language-dust">// 这里名称改为notice2 channel.exchangeDeclare("notice2", "direct");</code></pre> <p>需要注意的是, 因为我们之前声明了一个 fanout 类型的名叫 notice 的交换机,因此不能再声明一个同名的类型却不一样的交换机。</p> <p>然后可以发送消息了,我们发送10条消息,其中偶数条消息是秘密消息,只能被routing key 为s的队列接受,其余的消息所有的队列都能接受。</p> <pre> <code class="language-dust">for (int i = 0; i < 10; i++) { String routingKey = "n"; // normal if (i % 2 == 0) { routingKey = "s"; // secret } channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes()); }</code></pre> <p>接下来是消费者:</p> <pre> <code class="language-dust">// 声明一个名称随机的临时的队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机,同时带上routing key channel.queueBind(queueName, "notice2", "n"); // 消费者2号运行时,打开以下注释 // channel.queueBind(queueName, "notice2", "s");</code></pre> <p>注意,我们可以多次调用队列绑定方法,调用时,队列名和交换机名都相同,而routing key不同,这样可以使一个队列带有多个routing key。</p> <p>以下是完整代码:</p> <pre> <code class="language-dust">@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice2", "direct"); for (int i = 0; i < 10; i++) { String routingKey = "n"; // normal if (i % 2 == 0) { routingKey = "s"; // secret } channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes()); } channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice2", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "notice2", "n"); // channel.queueBind(queueName, "notice2", "s"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }</code></pre> <p>测试时,我们可以先运行一个 receive ,然后打开 channel.queueBind(queueName, "notice2", "s") 注释,再运行一次 receive ,这样就有两个消费者绑定到notice2交换机上,其中消费者1只能收到normal类型的消息,而消费者2既能收到normal类型的消息,又能收到secret类型的消息。接着可以运行send方法。如不出意外,可以看到如下打印结果:</p> <pre> <code class="language-dust">// 消费者1 1 3 5 7 9 // 消费者2 0 1 2 3 4 5 6 7 8 9</code></pre> <h3>2.5 Topics</h3> <p>有了以上的改进,我们的 notice 系统基本ok了。但有些时候,我们还需要更加灵活的消息刷选方式。比如我们对于电影信息,我们可能需要对它的地区,类型,限制级进行筛选。这时候就要借助 <strong>Topics</strong> 模型了。</p> <p>在 <strong>Topics</strong> 模型中,我们“升级”了 <strong>routing key</strong> ,它可以由多个关键词组成,词与词之间由点号( . )隔开。特别地,规定 <strong> * 表示任意的一个词; # 号表示任意的0个或多个词 </strong> 。</p> <p>假设我们现在需要接收电影信息,每条电影消息附带的 <strong>routingKey</strong> 有地区、类型、限制级3个关键字,即: district.type.age 。现在想实现的功能如下图:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/1d388579b2fc933cd5fa7583e45e2ad6.jpg"></p> <p>如上图所示,队列Q1只关心美国适合13岁以上的电影信息,队列Q2对动作片感兴趣,而队列Q3喜欢中国电影。</p> <p>下面用 <strong>Java</strong> 代码去实现上述功能,相较于之前基本上没有什么改变,下面直接给出代码:</p> <pre> <code class="language-dust">@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("movie", "topic"); channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes()); channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes()); channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes()); channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龙".getBytes()); channel.basicPublish("movie", "Chinese.comedy.13", null, "大话西游".getBytes()); channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯与祝英台".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("movie", "topic"); // 队列1 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "movie", "American.*.13"); // 队列2 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", "*.action.*"); // 队列3 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", "Chinese.#"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }</code></pre> <p>运行3次 receive 方法,注意打开或关闭相应的注释;再运行 send 方法,可以看到控制台输出如下内容:</p> <pre> <code class="language-dust">// 消费者1 The Bourne Ultimatum Titanic // 消费者2 The Bourne Ultimatum 卧虎藏龙 // 消费者3 卧虎藏龙 大话西游 梁山伯与祝英台</code></pre> <h3>2.6 RPC</h3> <p>第6种模型是用来做RPC(Remote procedure call, 远程程序调用)的。这里直接贴上代码,就不做解释了 。代码演示的是,客户端调用服务端的 fib 方法,得到返回结果。</p> <p>RPCServer.java</p> <pre> <code class="language-dust">import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; /** * Description: * * @author derker * @Time 2016-10-26 18:24 */ public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); AMQP.BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }</code></pre> <p>RPCClient.java</p> <pre> <code class="language-dust">import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; /** * Description: * * @author derker * @Time 2016-10-26 18:36 */ public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(10)"); response = fibonacciRpc.call("10"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception ignore) { } } } } }</code></pre> <p> </p> <p>来自:http://www.cnblogs.com/dongkuo/p/6001791.html</p> <p> </p>
本文由用户 25r9n4qy8 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!