| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
MaiKashiwag
6年前发布

Nginx通过LUA脚本访问RabbitMQ消息队列

来自: http://blog.csdn.net//jiao_fuyou/article/details/38753005


发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的

关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter

关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC

关于STOMP协议相关资料参见这里:

STOMP官方英文协议1.1版

STOMP协议学习与实战-1.1版

STOMP官方英文协议1.2版

STOMP协议说明书-1.2版


nginx配置:

location /rabbitmq {              lua_code_cache off;              content_by_lua_file conf/rabbitmq.lua;          }

rabbitmq.lua源码如下,该脚本简单的实现pub一个消息然后再sub该消息,set_timeout(60000)表示sub消息如果没有消息时等待60S超时

local strlen =  string.len  local json = require "json"  local rabbitmq = require "rabbitmqstomp"    local mq, err = rabbitmq:new()  if not mq then        return  end    mq:set_timeout(60000)    local ok, err = mq:connect {                      host = "127.0.0.1",                      port = 61613,                      username = "guest",                      password = "guest",                      vhost = "/"                  }  if not ok then      return  end  ngx.log(ngx.INFO, "Connect: " .. "OK")    local msg = {key="value1", key2="value2"}  local headers = {}  headers["destination"] = "/queue/my_queue"  headers["receipt"] = "msg#1"  headers["app-id"] = "luaresty"  headers["persistent"] = "true"  headers["content-type"] = "application/json"    local ok, err = mq:send(json.encode(msg), headers)  if not ok then      return  end  ngx.log(ngx.INFO, "Published: " .. json.encode(msg))    local headers = {}  headers["destination"] = "/queue/my_queue"  headers["persistent"] = "true"  headers["id"] = "123"    local ok, err = mq:subscribe(headers)  if not ok then      return  end    local data, err = mq:receive()  if not data then      return  end  ngx.log(ngx.INFO, "Consumed: " .. data)  ngx.header.content_type = "text/plain";  ngx.say(data);    local headers = {}  headers["persistent"] = "true"  headers["id"] = "123"    local ok, err = mq:unsubscribe(headers)    local ok, err = mq:set_keepalive(10000, 10000)  if not ok then      return  end

具体扩展一下,可以实现推送功能:发送方调用send将消息pub到RabbitMQ,接收方设置一个超时调用subscribe订阅消息(就类似于long polling)


 本文由用户 MaiKashiwag 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1455252002980.html
消息系统