| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
25r9n4qy8
8年前发布

消息队列——RabbitMQ

   <h2><strong>1. 写在前面</strong></h2>    <p>昨天简单学习了一个 <strong>消息队列</strong> 项目—— <strong>RabbitMQ</strong> ,今天趁热打铁,将学到的东西记录下来。</p>    <p>学习的资料主要是官网给出的6个基本的消息发送/接收 模型 ,或者称为6种不同的使用场景,本文便是对这6种模型加以叙述。</p>    <h2><strong>2. Tutorials</strong></h2>    <p>在学习6种模型之前,我们首先需要安装RabbitMQ。RabbitMQ支持多种系统平台,各平台的安装方法。安装好之后,我们使用如下命令启用Web端的管理插件: rabbitmq-plugins enable rabbitmq_management ,然后启动RabbitMQ。接着用浏览器访问 http://localhost:15672/ ,若能看到RabbitMQ相关Web页面,说明启动成功。</p>    <h3><strong>2.1 Hello World</strong></h3>    <p>正所谓万事开头难,我们先从最简单的 <strong>Hello World</strong> 开始。首先当然是新建一个项目,导入RabiitMQ相关jar。我采用Maven来构建项目,因此只需要在pom文件中添加如下依赖:</p>    <pre>  <code class="language-dust"><dependency>      <groupId>com.rabbitmq</groupId>      <artifactId>amqp-client</artifactId>      <version>3.6.5</version>  </dependency></code></pre>    <p>接下来学习最简单的消息队列模型,如下图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/ec1c1ed3525c8971c26f12620578e287.png"></p>    <p>在图中,P代表 producer,它是消息的 <strong>生产者</strong>;C代表 consumer ,它是消息的 <strong>消费者</strong> ;而红色的矩形正是我们所谓的 <strong>消息队列</strong> ,它位于 RabbitMQ 中( RabbitMQ 中可以有很多这样的队列,并且每个队列都有一个唯一的名字)。生产者(们)可以将消息发送到消息队列中,消费者(们)可以从消息队列中取出消息。</p>    <p>这种模型是不是很简单呢?下面我们使用 <strong>Java</strong> ,借助于RabbitMQ来实现这种模型的消息通信。</p>    <p>首先我们介绍如何 send 消息到消息队列。 send 之前,当然是和RabbitMQ服务器建立连接:</p>    <pre>  <code class="language-dust">ConnectionFactory factory = new ConnectionFactory();  factory.setHost("localhost");  Connection connection = factory.newConnection();</code></pre>    <p>接下来我们创建一个 channel ,大多数API都是通过这个对象来调用的:</p>    <pre>  <code class="language-dust">Channel channel = connection.createChannel();</code></pre>    <p>之后,我们便可以调用 channel 的如下方法去声明一个队列:</p>    <pre>  <code class="language-dust">channel.queueDeclare("hello", false, false, false, null);</code></pre>    <p>该方法的第一个参数是队列的名称,其余的参数先不管,之后会介绍。我们可以尝试着去执行以上的5行代码,然后打开Web端,可以看到新建了一个叫作 hello 的队列:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/665fba8e97cce8c23d613ee73c687ce8.png"></p>    <p>有了队列,我们便可以向其中发送消息了,同样还是调用 channel 对象的API:</p>    <pre>  <code class="language-dust">channel.basicPublish("", "hello", null, "Hello World".getBytes());</code></pre>    <p>以上代码所做的事情就是发送了一条字符串消息“Hello World”(第4个参数)到消息队列。你可能注意到我们调用了String对象的 getBytes 方法,没错,我们发送的实际上二进制数据。因此,理论上你能够发送任何数据到消息队列中,而不仅仅是文本信息。</p>    <p>第2个参数叫做 <strong>路由键(routingKey)</strong> ,在该模型下必须与队列名相同,至于为什么,和其他参数一样,之后会了解到。</p>    <p>我们可以修改发送的文本,再次执行上述代码,然后打开Web端查看,便可以查看到我们发送的消息:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/5179cbeaa84d39c7b98d0d717b59f1f9.png"></p>    <p>点击上图的 <strong>name</strong> 字段下的 <strong>hello</strong> ,可以查看 hello 队列中的具体信息:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/909a48393b1a4b18f5e672dc05a4421d.png"></p>    <p>接下来,我们去尝试着去获取 <strong>生产者</strong> 发送的消息,和 send 方法一样,我们同样需要连接服务器,创建 channel ,声明队列:</p>    <pre>  <code class="language-dust">ConnectionFactory factory = new ConnectionFactory();  factory.setHost("localhost");  Connection connection = factory.newConnection();  Channel channel = connection.createChannel();  channel.queueDeclare("hello", false, false, false, null);</code></pre>    <p>之后我们可以调用 channel 的相关方法去监听队列,接收消息:</p>    <pre>  <code class="language-dust">channel.basicConsume("hello", true, new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {              System.out.println(new String(body, "UTF-8"));          }  });</code></pre>    <p>以上 basicConsume 方法中,第一个参数是队列的名字;第二个参数表示是否自动确认消息的接收情况,我们使用true,自动确认;第三个参数需要传入一个实现了 Consumer 接口的对象,我们简单的 new 一个默认的 Consumer 的实现类 DefaultConsumer ,然后在 handleDelivery 方法中去处理接收到的消息( handleDelivery 方法会在接收到消息时被回调)。</p>    <p>运行以上代码,我们可以打印出之前向队列中 send 的数据:</p>    <pre>  <code class="language-dust">Hello World  Hello World2</code></pre>    <p>下面是 <strong>Hello World</strong> 的完整代码:</p>    <pre>  <code class="language-dust">public class App {        @Test      public void send() throws IOException, TimeoutException {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.queueDeclare("hello", false, false, false, null);            channel.basicPublish("", "hello", null, "Hello World2".getBytes());            channel.close();          connection.close();      }        @Test      public void receive() throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.queueDeclare("hello", false, false, false, null);            channel.basicConsume("hello", true, new DefaultConsumer(channel) {              @Override              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                  System.out.println(new String(body, "UTF-8"));              }            });          synchronized (this){              // 因为以上接收消息的方法是异步的(非阻塞),当采用单元测试方式执行该方法时,程序会在打印消息前结束,因此使用wait来防止程序提前终止。若使用main方法执行,则不需要担心该问题。              wait();          }      }  }</code></pre>    <h3><strong>2.2 Work queues</strong></h3>    <p>接下来我们学习第二种模型—— <strong>Work Queues</strong> 。顾名思义,这种模型描述的是一个生产者(Boss)向队列发消息(任务),多个消费者(worker)从队列接受消息(任务),如下图所示:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/0fde8ffebb772c7d486921464ae55a29.png"></p>    <p>下面我们用代码去实现。先是生产者 send 消息到队列,这次我们多发送些数据:</p>    <pre>  <code class="language-dust">@Test  public void send() throws IOException, TimeoutException {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.queueDeclare("hello", false, false, false, null);        for (int i = 0; i < 9; i++) {          channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());      }        channel.close();      connection.close();  }</code></pre>    <p>然后是消费者接收数据:</p>    <pre>  <code class="language-dust">@Test  public void receive() throws Exception {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.queueDeclare("hello", false, false, false, null);        channel.basicConsume("hello", true, new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {              System.out.println(new String(body, "UTF-8"));              try {              //  Thread.sleep(1000);                  Thread.sleep(10000);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }      });      synchronized (this) {          wait();      }  }</code></pre>    <p>代码基本上和 Hello World 的代码一样,只是加上句 sleep 来模拟消费者(worker)处理消息所花的时间。</p>    <p>我们可以先执行三次 receive 方法(修改 sleep 的时间,其中消费者1 sleep 10s,消费者2,3 sleep 1s),让三个消费者(worker)一起等待消息的到来,然后执行 send 方法发送9条消息,观察三个消费者收到的消息情况。</p>    <p>若不出意外,你会看到如下的打印结果:</p>    <pre>  <code class="language-dust">// 消费者1  0  // 10s 后  3  // 10s 后  6  // 消费者2  1  // 1s 后  4  // 1s 后  7  // 消费者3  2  // 1s 后  5  // 1s 后  8</code></pre>    <p>通过打印结果,我们可以总结出 <strong>Work queues</strong> 的几个特点:</p>    <ol>     <li>一条消息只会被一个消费者接收;</li>     <li>消息是平均分配给消费者的;</li>     <li>消费者只有在处理完某条消息后,才会收到下一条消息。</li>    </ol>    <p>Work queues(Task Queuess)的概念在一些Web场景的应用中是很有用的,比如我们能够用它来构建一个master-slave结构的分布式爬虫系统:系统中有一个master节点和多个slave节点,master节点负责向各个slave节点分配爬取任务。</p>    <h3><strong>2.3 Publish/Subscribe</strong></h3>    <p>但有些时候,我们可能希望一条消息能够被多个消费者接受到,比如一些公告信息等,这时候用 <strong>Work Queue</strong> 模型显然不合适,而 <strong>Publish/Subscribe</strong> 模型正是对应这种使用场景的。</p>    <p>在介绍Publish/Subscribe之前,我们快速回顾之前的两个模型,它们好像都是生产者将消息直接发送到消息队列,但其实不是这样的,甚至有可能生产者根本就不知道消息发送到了哪一个消息队列。</p>    <p>先别着急,下面我们完整地介绍RabbitMQ消息发送/接受的方式。</p>    <p>事实上,生产者是把消息发送到了 <strong>交换机(exchange)</strong> 中,然后交换机负责(决定)将消息发送到(哪一个)消息队列中。其模型如下图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/78834739c797fabd64b5dbec0863752b.jpg"></p>    <p>这时候你可能会疑惑:既然消息是被发送到了交换机中,那我们之前发送的消息是被发送到了哪一个交换机中了?它有没有机制能够让特定的消息发送到指定的队列?</p>    <p>先回答第一个问题。还记得我们在 <strong>Hello World</strong> 中写的发送消息的代码吗?</p>    <pre>  <code class="language-dust">channel.basicPublish("", "hello", null, message.getBytes());</code></pre>    <p>事实上第一个参数便是指定交换机的名字,即指定消息被发送到哪一个交换机。空字符串表示 <strong>默认交换机(Default Exchange)</strong> ,即我们之前发送的消息都是先发送到 <strong>默认交换机</strong> ,然后它再路由到相应的队列中。其实我们可以通过Web页面去查看所有存在的交换机:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/e559a4c7b1fe908d341347bb43a1b657.png"></p>    <p>接着回答第二个问题。路由的依据便是通过第二个参数—— <strong>路由键(routing key)</strong> 指定的,之前已经提到过。在之前代码中,我们指定第二个参数为"hello",便是指定消息应该被交换机路由到路由键为hello的队列中。而 <strong>默认交换机(Default Exchange)</strong> 有一个非常有用的性质:</p>    <p>每一个被创建的队列都会被自动的绑定到默认交换机上,并且路由键就是队列的名字。</p>    <p>交换机还有4种不同的类型,分别是 direct , fanout , topic , headers ,每种类型路由的策略不同。</p>    <p>direct 类型的交换机要求和它绑定的队列带有一个路由键K,若有一个带有路由键R的消息到达了交换机,交换机会将此消息路由到路由键K = R的队列。默认交换机便是该类型。因此,在下图中,消息会沿着绿色箭头路由:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/a84fa50921a68eb0a2fb6217d234340a.jpg"></p>    <p>fanout 类型的交换机会路由每一条消息到所有和它绑定的队列,忽略路由键。</p>    <p>剩下的两种类型之后再做介绍。</p>    <p>在以上概念基础上,我们来看第3种消息模型: <strong>Publish/Subscribe</strong> 。如下图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/59a7bcc69b4bf5388ff28f0790ca5b90.png"></p>    <p>该模型是要让所有的消费者都能够接收到每一条消息。显然, fanout 类型的交换机更符合我们当前的需求。为此,先创建一个 fanout 类型的交换机。</p>    <pre>  <code class="language-dust">channel.exchangeDeclare("notice", "fanout");</code></pre>    <p>其中,第一个参数是交换机的名称;第二个参数是交换机的类型。</p>    <p>然后我们可以 send 消息了:</p>    <pre>  <code class="language-dust">channel.basicPublish( "notice", "", null, message.getBytes());</code></pre>    <p>对于消费者,我们需要为每一个消费者创建一个独立的队列,然后将队列绑定到刚才指定的交换机上即可:</p>    <pre>  <code class="language-dust">// 该方法会创建一个名称随机的临时队列  String queueName = channel.queueDeclare().getQueue();  // 将队列绑定到指定的交换机("notice")上  channel.queueBind(queueName, "notice", "");</code></pre>    <p>以下完整的代码:</p>    <pre>  <code class="language-dust">@Test  public void send() throws IOException, TimeoutException {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.exchangeDeclare("notice", "fanout");      channel.basicPublish( "notice", "", null, "Hello China".getBytes());      channel.close();      connection.close();  }    @Test  public void receive() throws Exception {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.exchangeDeclare("notice", "fanout");      String queueName = channel.queueDeclare().getQueue();      channel.queueBind(queueName, "notice", "");        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {              System.out.println(new String(body, "UTF-8"));          }        });      synchronized (this) {          wait();      }  }</code></pre>    <p>首先运行两次 receive 方法,让两个消费者等待接收消息,然后可以在Web端查看此时的队列情况,如下图所示:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/b4f653f0ac8d6bd90784652ecd3f906a.png"></p>    <p>可以看到图中有两个名称随机的队列。接着运行 send 方法发送一条消息,最终我们会看到两个消费者都打印出了 Hello China 。然后停止虚拟机让消费者断开连接,再次在Web端查看队列情况,会发现刚才的两个队列被自动删除了。</p>    <h3><strong>2.4 Routing</strong></h3>    <p>以上是Publish/Subscribe模式,它已经能让我们的通知(notice)系统正常运转了。现在再考虑这样一个新需求:对于一些机密通知,我们只想让部分人看到。这就要求交换机对绑定在其上的队列进行筛选,于是引出了又一个新的模型: <strong>Routing</strong> 。</p>    <p>之前我们说过,对于 direct 类型的交换机,它会根据 <strong>routing key</strong> 进行路由,因此我们可以借助它来实现我们的需求,模型结构如下图:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/05d48de4439d8e1a8d1778295a308652.jpg"></p>    <p>下面用代码来实现。先看生产者。</p>    <p>首先要声明一个 direct 类型的交换机:</p>    <pre>  <code class="language-dust">// 这里名称改为notice2  channel.exchangeDeclare("notice2", "direct");</code></pre>    <p>需要注意的是, 因为我们之前声明了一个 fanout 类型的名叫 notice 的交换机,因此不能再声明一个同名的类型却不一样的交换机。</p>    <p>然后可以发送消息了,我们发送10条消息,其中偶数条消息是秘密消息,只能被routing key 为s的队列接受,其余的消息所有的队列都能接受。</p>    <pre>  <code class="language-dust">for (int i = 0; i < 10; i++) {              String routingKey = "n"; // normal              if (i % 2 == 0) {                  routingKey = "s"; // secret              }              channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());          }</code></pre>    <p>接下来是消费者:</p>    <pre>  <code class="language-dust">// 声明一个名称随机的临时的队列  String queueName = channel.queueDeclare().getQueue();  // 绑定交换机,同时带上routing key  channel.queueBind(queueName, "notice2", "n");  // 消费者2号运行时,打开以下注释  // channel.queueBind(queueName, "notice2", "s");</code></pre>    <p>注意,我们可以多次调用队列绑定方法,调用时,队列名和交换机名都相同,而routing key不同,这样可以使一个队列带有多个routing key。</p>    <p>以下是完整代码:</p>    <pre>  <code class="language-dust">@Test  public void send() throws IOException, TimeoutException {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();        channel.exchangeDeclare("notice2", "direct");      for (int i = 0; i < 10; i++) {          String routingKey = "n"; // normal          if (i % 2 == 0) {              routingKey = "s"; // secret          }          channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());      }      channel.close();      connection.close();  }    @Test  public void receive() throws Exception {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.exchangeDeclare("notice2", "direct");        String queueName = channel.queueDeclare().getQueue();      channel.queueBind(queueName, "notice2", "n");      // channel.queueBind(queueName, "notice2", "s");        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {              System.out.println(new String(body, "UTF-8"));          }      });      synchronized (this) {          wait();      }  }</code></pre>    <p>测试时,我们可以先运行一个 receive ,然后打开 channel.queueBind(queueName, "notice2", "s") 注释,再运行一次 receive ,这样就有两个消费者绑定到notice2交换机上,其中消费者1只能收到normal类型的消息,而消费者2既能收到normal类型的消息,又能收到secret类型的消息。接着可以运行send方法。如不出意外,可以看到如下打印结果:</p>    <pre>  <code class="language-dust">// 消费者1  1  3  5  7  9  // 消费者2  0  1  2  3  4  5  6  7  8  9</code></pre>    <h3>2.5 Topics</h3>    <p>有了以上的改进,我们的 notice 系统基本ok了。但有些时候,我们还需要更加灵活的消息刷选方式。比如我们对于电影信息,我们可能需要对它的地区,类型,限制级进行筛选。这时候就要借助 <strong>Topics</strong> 模型了。</p>    <p>在 <strong>Topics</strong> 模型中,我们“升级”了 <strong>routing key</strong> ,它可以由多个关键词组成,词与词之间由点号( . )隔开。特别地,规定 <strong> * 表示任意的一个词; # 号表示任意的0个或多个词 </strong> 。</p>    <p>假设我们现在需要接收电影信息,每条电影消息附带的 <strong>routingKey</strong> 有地区、类型、限制级3个关键字,即: district.type.age 。现在想实现的功能如下图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/1d388579b2fc933cd5fa7583e45e2ad6.jpg"></p>    <p>如上图所示,队列Q1只关心美国适合13岁以上的电影信息,队列Q2对动作片感兴趣,而队列Q3喜欢中国电影。</p>    <p>下面用 <strong>Java</strong> 代码去实现上述功能,相较于之前基本上没有什么改变,下面直接给出代码:</p>    <pre>  <code class="language-dust">@Test  public void send() throws IOException, TimeoutException {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();        channel.exchangeDeclare("movie", "topic");        channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());      channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());      channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());        channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龙".getBytes());      channel.basicPublish("movie", "Chinese.comedy.13", null, "大话西游".getBytes());      channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯与祝英台".getBytes());        channel.close();      connection.close();  }    @Test  public void receive() throws Exception {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");      Connection connection = factory.newConnection();      Channel channel = connection.createChannel();      channel.exchangeDeclare("movie", "topic");      // 队列1      String queueName = channel.queueDeclare().getQueue();      channel.queueBind(queueName, "movie", "American.*.13");      // 队列2  //        String queueName = channel.queueDeclare().getQueue();  //        channel.queueBind(queueName, "movie", "*.action.*");      // 队列3  //        String queueName = channel.queueDeclare().getQueue();  //        channel.queueBind(queueName, "movie", "Chinese.#");          channel.basicConsume(queueName, true, new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {              System.out.println(new String(body, "UTF-8"));          }        });      synchronized (this) {          wait();      }  }</code></pre>    <p>运行3次 receive 方法,注意打开或关闭相应的注释;再运行 send 方法,可以看到控制台输出如下内容:</p>    <pre>  <code class="language-dust">// 消费者1  The Bourne Ultimatum  Titanic  // 消费者2  The Bourne Ultimatum  卧虎藏龙  // 消费者3  卧虎藏龙  大话西游  梁山伯与祝英台</code></pre>    <h3>2.6 RPC</h3>    <p>第6种模型是用来做RPC(Remote procedure call, 远程程序调用)的。这里直接贴上代码,就不做解释了 。代码演示的是,客户端调用服务端的 fib 方法,得到返回结果。</p>    <p>RPCServer.java</p>    <pre>  <code class="language-dust">import com.rabbitmq.client.*;  import com.rabbitmq.client.AMQP.BasicProperties;    /**   * Description:   *   * @author derker   * @Time 2016-10-26 18:24   */  public class RPCServer {          private static final String RPC_QUEUE_NAME = "rpc_queue";        private static int fib(int n) {          if (n == 0) return 0;          if (n == 1) return 1;          return fib(n - 1) + fib(n - 2);      }        public static void main(String[] argv) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory factory = new ConnectionFactory();              factory.setHost("localhost");              connection = factory.newConnection();              channel = connection.createChannel();              channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);              channel.basicQos(1);              QueueingConsumer consumer = new QueueingConsumer(channel);              channel.basicConsume(RPC_QUEUE_NAME, false, consumer);              System.out.println(" [x] Awaiting RPC requests");              while (true) {                  String response = null;                  QueueingConsumer.Delivery delivery = consumer.nextDelivery();                  AMQP.BasicProperties props = delivery.getProperties();                  BasicProperties replyProps = new BasicProperties                          .Builder()                          .correlationId(props.getCorrelationId())                          .build();                  try {                      String message = new String(delivery.getBody(), "UTF-8");                      int n = Integer.parseInt(message);                      System.out.println(" [.] fib(" + message + ")");                      response = "" + fib(n);                  } catch (Exception e) {                      System.out.println(" [.] " + e.toString());                      response = "";                  } finally {                      channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));                      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);                  }              }          } catch (Exception e) {              e.printStackTrace();          } finally {              if (connection != null) {                  try {                      connection.close();                  } catch (Exception ignore) {                  }              }          }      }  }</code></pre>    <p>RPCClient.java</p>    <pre>  <code class="language-dust">import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.QueueingConsumer;  import com.rabbitmq.client.AMQP.BasicProperties;  import java.util.UUID;    /**   * Description:   *   * @author derker   * @Time 2016-10-26 18:36   */  public class RPCClient {      private Connection connection;      private Channel channel;      private String requestQueueName = "rpc_queue";      private String replyQueueName;      private QueueingConsumer consumer;        public RPCClient() throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          connection = factory.newConnection();          channel = connection.createChannel();            replyQueueName = channel.queueDeclare().getQueue();          consumer = new QueueingConsumer(channel);          channel.basicConsume(replyQueueName, true, consumer);      }        public String call(String message) throws Exception {          String response = null;          String corrId = UUID.randomUUID().toString();            BasicProperties props = new BasicProperties                  .Builder()                  .correlationId(corrId)                  .replyTo(replyQueueName)                  .build();            channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));            while (true) {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              if (delivery.getProperties().getCorrelationId().equals(corrId)) {                  response = new String(delivery.getBody(), "UTF-8");                  break;              }          }            return response;      }        public void close() throws Exception {          connection.close();      }        public static void main(String[] argv) {          RPCClient fibonacciRpc = null;          String response = null;          try {              fibonacciRpc = new RPCClient();              System.out.println(" [x] Requesting fib(10)");              response = fibonacciRpc.call("10");              System.out.println(" [.] Got '" + response + "'");          } catch (Exception e) {              e.printStackTrace();          } finally {              if (fibonacciRpc != null) {                  try {                      fibonacciRpc.close();                  } catch (Exception ignore) {                  }              }          }      }  }</code></pre>    <p> </p>    <p>来自:http://www.cnblogs.com/dongkuo/p/6001791.html</p>    <p> </p>    
 本文由用户 25r9n4qy8 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1477534968620.html
RabbitMQ 消息系统