| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
HugRenner
9年前发布

[译]RabbitMQ系列教程(一):Hello World

来自: http://my.oschina.net/andylucc/blog/605746


RabbitMQ是一个消息中间件。基本思想非常简单:接收和转发消息。你可以把它想象成一个邮局:当我们往邮箱里面投递一个信件的时候,我们非常确信邮政快递员能够把我们的邮件送达到接受者的手中。使用这个隐喻,RabbitMQ是一个邮箱,是一个邮政局,是一个邮政快递员。

与传统的邮政局不同的是,RabbitMQ处理的不是纸质的邮件,而是二进制数据构成的消息。

关于RabbitMQ和消息的一些行话:

1,生产消息即发送,一个发送消息的进程叫Producer。我们用下面的图表示(一个被标记为P的圆圈):

2,queue代表接消息的邮箱,存在于RabbitMQ中。尽管消息在应用和RabbitMQ之间流动,但是他们只能被存储在queue之中。queue是没有任何限制的,它可以用来存储任何你想存储的消息,它本质上是一个无限制的buffer。多个producer可以向一个queue中发送消息。多个consumer可以从一个queeue中接收消息。队列可以用下面的图来描述:


3,consuming意味着接收,consumer是一个想要接收消息的进程,同样,我们可以用下面的图来表达consumer:



值得注意的是,producer、consumer和broker不一定是在同一台机器上,的确在通常的使用中很少有人在同一台机器上使用它。


Hello World!

(使用pika 0.10.0 Python client)

下面的例子不会太复杂——我们发送一个消息,接收它并把它打印在终端。为了完成这件小事,我们需要写两端小程序:一个是用来发送消息的,另一个是用来接收消息并打印的。

我们的设计大概可以用下面的图来表示:

producer生产消息发送至hello queue,consumer从hello queue中接收消息。

RabbitMQ libraries    RabbitMQ是基于AMQP 0.9.1版本,它是一个开放的,用于处理消息的协议。现在已经有基于不同语言的多种版本的RabbitMQ客户端,我们将会使用Pika,它是RabbitMQ团队推荐的一个客户端,我们可以使用pip包管理工具来安装它。

发送消息:

我们的第一个小程序是send.py,我们将会向队列发送一个消息。这个时候我们需要做的第一件事情就是和RabbitMQ服务器建立连接。

#!/usr/bin/env python  import pika  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  channel = connection.channel()

通过上面的代码我们和本覅RabbitMQ的broker建立了一条物理连接,如果想要和不同的机器建立连接,这里只需要修改一下IP地址或域名即可。接下来,在发送消息之前,我们需要确保接收队列存在。如果我们将消息发送至一个不存在的队列,RabbitMQ会直接将消息丢弃。我们先创建一个叫“hello”的队列:

channel.queue_declare(queue='hello')

这个时候我们已经可以发送消息了,我们的第一个消息只包含一个字符串“Hello World”,我们将会把它发送至“hello”队列。

在RabbitMQ中,消息不是被直接送到队列的,而是首先被送到exchange中。我们目前不需要了解的那么详细,或者我们可以通过第三方的教程来学习有关exchanges的知识。我们现在所需要知道的是如何使用空字符串标识的默认exchange。这个exchange是比较特殊的,它使得我们可以指定消息应该被送往哪里。队列名字需要在routing_key参数中指定。

channel.basic_publish(exchange='',                        routing_key='hello',                        body='Hello World!')print(" [x] Sent 'Hello World!'")

在退出程序之前,我们需要确认网络缓冲区的内容被刷新,内容被发送出去,我们可以通过关闭连接来达到效果:

connection.close()

接收消息:

我们的第二个程序receive.py将会送队列中接收消息并且将消息的内容打印出来。因此,我们需要再次连接RabbitMQ服务器,相关代码和之前相同。下一步,和之前一样,我们必须确保队列存在。因此我们可以用queue_declare。对于queue_declare接口,我们可以多次调用它,而结果是只有一个队列会被创建。

channel.queue_declare(queue='hello')

你可能有点疑问,我们为什么要再一次declare队列,在之前的代码中,我们已经declare了呀。主要是因为我们一定要确认在发送消息前队列是存在的,否则消息将会被丢弃。

我们可以通过命令行来查看队列:  $ sudo rabbitmqctl list_queues  Listing queues ...  hello    0  ...done.

从消息队列接收消息是相对复杂的一件事,我们通过给队列绑定一个回调函数,当我们接收到一条消息的时候,回调函数会被pika库调用,下面这个回调函数将会把消息打印出来。

def callback(ch, method, properties, body):      print(" [x] Received %r" % body)

接下来,接下来我们将要告诉RabbitMQ这个回调函数将会从指定的“hello”队列来接收消息:

channel.basic_consume(callback,                        queue='hello',                        no_ack=True)

no_ack参数将会在后面介绍。

最后,我们进入了一个死循环,监听消息,当有消息的时候我们将消息打印出来,然后继续监听。

print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

下面的完整的代码:

send.py

#!/usr/bin/env python  import pika  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  channel = connection.channel()  channel.queue_declare(queue='hello')  channel.basic_publish(exchange='',                        routing_key='hello',                        body='Hello World!')  print(" [x] Sent 'Hello World!'")  connection.close()

receive.py

#!/usr/bin/env python  import pika  connection = pika.BlockingConnection(pika.ConnectionParameters(          host='localhost'))  channel = connection.channel()  channel.queue_declare(queue='hello')  def callback(ch, method, properties, body):      print(" [x] Received %r" % body)  channel.basic_consume(callback,                        queue='hello',                        no_ack=True)  print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()

现在我们可以在终端试运行我们的程序,首先,我们来向队列发送一条消息:

$ python send.py   [x] Sent 'Hello World!'

然后来接收消息:

 $ python receive.py    [*] Waiting for messages. To exit press CTRL+C    [x] Received 'Hello World!'

好了,例子到这里结束,下一节会介绍如何创建一个工作队列。

 本文由用户 HugRenner 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1453303605355.html
ActiveMQ 消息系统