RabbitMQ高级指南——AMQP解析(上)
<p>AMQP(Advanced Message Queuing Protocol) <strong>高级消息队列协议</strong> 是一个规范,它规定了</p> <ul> <li> <p>系统模型,定义了AMQP中各个 <strong>实体</strong> 的名称,交互流程</p> </li> <li> <p>通讯规范,定义了实体之间交互的数据包格式、 <strong>实体</strong> 之间通讯的具体命令</p> </li> <li> <p>序列化标准,定义了通讯数据的序列化和反序列化格式</p> </li> </ul> <p>《AMQP解析》分为上下,主要讨论系统模型和通讯规范。这两部分是AMQP的核心内容,理解这两部分内容也就掌握了AMQP的精髓。</p> <h2><strong>AMQP模型</strong></h2> <p> </p> <p>也许你见过下面这幅图</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/fee32e73ad050d8731ecf6aad31dc1cc.jpg"></p> <p>这就是AMQP的 <strong>实体</strong> 模型,它把整个系统分代表 <strong>客户端</strong> 的Publisher(生产者)、Consumer(消费者);代表 <strong>服务器端</strong> 的Broker。 不同于我们用过的其他MQ系统"消息=队列",AMQP模型解耦和 <strong>消息</strong> 和 <strong>队列</strong> ,我们模拟一个Publisher发送log到Consumer。</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/af3e431e80a3ab88e276e3d2bf0a3520.gif"></p> <ul> <li> <p>Publisher产生一条数据,发送 到Broker。例子中就是发送 到服务器</p> </li> <li> <p>Broker中的Exchange可以理解为一个 <strong>规则表</strong> (routing key和queue的映射关系——Binding),Broker收到消息后根据 <strong>routing key</strong> 查询投递的目标 <strong>queue</strong></p> </li> <li> <p>Consumer向Broker发送 <strong>订阅</strong> 消息的时候会指定自己监听哪个queue,当有数据到达queue的时broker会推送数据到consumer。</p> </li> </ul> <p>Publisher指明谁可以收到消息(routing key)而不具体指定某个Queue;Consumer只是纯粹的监听Queue而不会关心数据从而来 。Exchange非常像计算机网络通讯交换机, <strong>收到数据后查询转发表决定下一跳出口</strong> ;实际上Exchange并不真实的存在它仅仅是一个规则表。匹配->投递的动作是由Broker内部的进程完成的。 Publisher、Consumer连接到Broker都是通过 <strong>Connection</strong> 完成的,它代表了一个TCP连接。TCP连接对于操作系统来说是非常昂贵的资源,如果每秒钟千上万个消息发送给Broker,每次发送都是一个TCP连接这是一笔非常大的系统开销。为了提高通道的利用率AMQP规定了channel的概念,可以把它理解为和broker的一次会话。如果你接触过Http2.0应该知道多路复用(Multiplexing),这个特性和AMQP是一模一样的。</p> <h2><strong>Exchange的类型</strong></h2> <p> </p> <p>Exchange代表了消息和Queue的映射关系,它非常灵活。AMQP定义了三种类型的关联方式</p> <ul> <li> <p>直接关联(direct)</p> </li> </ul> <p style="text-align: center;"><img src="https://simg.open-open.com/show/90c673370315a8b052649043eb35a06a.png"></p> <p>这种类型的Exchange看似简单其实非常有威力,它的逻辑是:Exchange根据routing key寻找到对应的Queue。比如我定义下面的Exchange</p> <p><img src="https://simg.open-open.com/show/928038832f046db7238d8c44931671c0.jpg"> 我们需要把“性能数据”分别存放在Mysql和InfluxDB,可以定义一个“metric”Exchange,把“mysql”、“InfluxDB”和“mertic”做绑定(binding)。 Broker收到发送给Exchange的数据后会判断routing key,如果publisher没有指定routing key则分别投递给InfluxDB、mysql。所以看起来就像是一条数据被镜像成两份分别发给两个Queue。 “空”也是一种routing key。设置为“空”其实还是会发生“比较”,只不过publisher刚好也是“空”所以才会往两个队列中同时发送。 如果我们把mysql的绑定修改一下</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/0cd7f90a7ac3348e2ba125b170ecb814.jpg"></p> <p>Broker收到发送给Exchange的数据后会判断routing key,如果publisher没有指定routing key则投递给InfluxDB;如果routing key是mysql则投递给mysql。</p> <ul> <li> <p>广播(fanout)</p> </li> </ul> <p style="text-align: center;"><img src="https://simg.open-open.com/show/cd4630c34b830eb0c63359e1b21b096c.png"></p> <p>这种类型的Exchange最简单,无视routing key,有多少个Queue和Exchange绑定就直接推送到Queue。 比如上面的例子中如果Exchange是fanout类型的,则无论你设置不设置routing key,publisher发送不发送routing key;Exchange都会分别投递到两个Queue。一般这种类型的Exchange很少用。</p> <ul> <li> <p>模糊匹配(topic)</p> </li> </ul> <p style="text-align: center;">和direct很像,direct比较routing key的时候是严格比较。topic则允许你用“模糊匹配”。 <img src="https://simg.open-open.com/show/40ac94694ab163c8514c9cadc4666f67.jpg"></p> <p>上面的映射可以翻译为:</p> <ul> <li> <p>所有cpu、mem开头的数据给mysql Queue</p> </li> <li> <p>所有disk、network开头的数据给InfluxDB Queue</p> </li> </ul> <p>发送数据的时候我们可以这样</p> <p><img src="https://simg.open-open.com/show/dbb2c30b5506288ce4643eb5151b27d4.jpg"></p> <p>内存数据我们可以用“mem.free”、“mem.usage”,它们都会被投递到mysql queue。同样的道理,disk、network开头的数据会被投递到InfluxDB队列中。 需要特别注意的是</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/7c29bd47519854c999e1447d8994afa3.jpg"></p> <p>如果你routing key什么都不设置,它的含义不是指“匹配所有的消息”;而是指匹配“routing key为空”的数据(publisher发送的routing key是空)。如果你要匹配所有的数据,正确的方法应该是——“#”。 提示:RabbitMQ还有一个扩展类型的Exchange——header。它会根据publisher的发送数据的头部信息选择Queue。比如可以根据header部分的userId选择Queue其实有点像IM了(userId代表的是发送者用户名)</p> <h2><strong>小技巧</strong></h2> <p>学习的最好方式是实践,AMQP的规范并不像想想的那么复杂,特别是RabbitMQ的实现代码非常简洁,甚至有人怀疑AMQP协议是专门为Erlang定制的(其实二者真没有任何关系) RabbitMQ提供了一个漂亮的Web界面可以让我们不用写一行代码就能实验AMQP的所有功能,修改rabbitmq的配置文件启用rabbitmq_management插件</p> <pre> 配置文件enabled_plugins [rabbitmq_management].</pre> <p>你可以用命令行搞定 rabbitmq-plugins enable rabbitmq_management 重启RabbitMQ后访问15672端口</p> <p>过命令行新建一个用户</p> <pre> rabbitmqctl add_user fireflyc 123 #新建用户fireflyc,密码123 rabbitmqctl set_user_tags fireflyc administrator #设置fireflyc是超级管理员</pre> <p>登录</p> <p style="text-align: center;"><img src="https://simg.open-open.com/show/7444fe342e6e34e564dc85af9e01610e.jpg"></p> <ul> <li> <p>Exchange标签我们可以看到所有的Exchange,最底下可以Add a new exchange</p> </li> <li> <p>Queue标签我们可以看到所有的Queue,最底下可以Add a new queue</p> </li> <li> <p>点击某个Exchange我们可以看到Binding,可以通过Publish Message发送消息</p> </li> <li> <p>点击某个Queue我们可以通过Get message获取队列中的消息</p> </li> </ul> <p> </p> <p> </p> <p style="text-align:center"> </p> <p> </p> <p>来自:http://mp.weixin.qq.com/s?__biz=MzIxMjAzMDA1MQ==&mid=2648945635&idx=1&sn=966633eeba2567e7759b597e43568054&chksm=8f5b54efb82cddf9678821ad9708fc404c087034471f3385ccac09dae0392a146b3673e3ccbd#rd</p> <p> </p>
本文由用户 nb400195 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!