记一次MQTT服务器EMQ X的使用
记一次MQTT服务器EMQ X的使用
1. EMQ X介绍
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。
Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。
MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。
EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
2. 如何安装
我自己使用的是Docker安装的,毕竟很方便,主要是如果安装得有问题或者配置得有问题可以直接删除,大不了重新run就是了。
- 拉取镜像
1
docker pull emqx/emqx
- 启动镜像
1
docker run -d -v /root/emqx-config:/opt/emqx/etc/plugins --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
- 这里详细介绍一下各个端口的说明
端口号 | 说明 |
---|---|
1883 | MQTT/TCP 协议端口 |
11883 | MQTT/TCP 协议内部端口,仅用于本机客户端连接 |
8883 | MQTT/SSL 协议端口 |
8083 | MQTT/WS 协议端口 |
8084 | MQTT/WSS 协议端口 |
18083 | WEB 控制面板 |
8081 | HTTP API 操作EMQ X的API接口 |
EMQ X的插件配置都在
/opt/emqx/etc/plugins
下,我一般比较喜欢把这个文件夹映射出来。不能直接映射出来,如果第一次就以-v
方式挂载的话,容器内的plugins
目录会被清空。所以这里有个技巧,先不加-v
启动一次,再使用docker cp emqx:/opt/emqx/etc/plugins /root/emqx-config
把配置文件导出,再删除这个容器。再以-v
方式启动,这样就可以直接在宿主机上修改配置文件了。当然,如果你不觉得麻烦,也可以不挂载宿主机,直接进入容器修改。在容器环境中,有一些操作EMQ X的基本命令。但是我并不推荐用这个这些命令去操作,毕竟谁会去记一大堆的命令呢?我一般都是通过WEB控制面板来操作和监控EMQ X的,其实它自带的WEB面板就已经非常好用了。
3. 如何使用
我之所以要用到EMQ X,还是因为毕设有一个在线聊天的需求。想了几种解决方案,无非就是socket, 三方IM服务,还有就是MQTT。
socket需要自己实现,麻烦!可能还不稳定,性能也可能不够。
第三方的IM服务,功能强大但太过冗余,毕竟只是一个在线聊天,不涉及其他的群啊、黑名单啊这些乱七八糟的。
MQTT,在看了几种主流的MQTT服务器之后,选择了EMQ X。主要是因为其功能强大,使用简单,文档丰富。
在启动之后,可以去看看启动的日志docker logs emqx
或者直接访问http://127.0.0.1:18083
看是否可以进入WEB控制面板,控制面板的初始账号是admin密码是public。登录之后可以进入通用 > 用户 > 编辑 > 修改密码来修改admin账号的登陆密码。
EMQ X的所有插件都在插件栏下,下面主要介绍一下我在这次毕设中使用的两个插件功能,HTTP鉴权和WEB钩子。
TIP:修改完配置文件之后记得重启插件,建议在WEB控制面板中操作。
HTTP鉴权
其支持的鉴权插件很多,详见https://docs.emqx.cn/broker/v4.3/advanced/auth.html。这里只介绍HTTP方式,其插件名为emqx_auth_http
,顾名思义就是通过HTTP接口来确认用户的连接资格。
认证原理
EMQ X 在设备连接事件中使用当前客户端相关信息作为参数,向用户自定义的认证服务发起请求查询权限,通过返回的 HTTP 响应状态码 (HTTP statusCode) 来处理认证请求。
- 认证失败:API 返回 4xx 状态码
- 认证成功:API 返回 200 状态码
- 忽略认证:API 返回 200 状态码且消息体 ignore
加盐规则与哈希方法
HTTP 在请求中传递明文密码,加盐规则与哈希方法取决于 HTTP 应用。
认证请求
进行身份认证时,EMQ X 将使用当前客户端信息填充并发起用户配置的认证查询请求,查询出该客户端在 HTTP 服务器端的认证数据。
1 | # etc/plugins/emqx_auth_http.conf |
HTTP 请求方法为 GET 时,请求参数将以 URL 查询字符串的形式传递;POST、PUT 请求则将请求参数以普通表单形式或者以 Json 形式提交(由 content-type 的值决定)。
你可以在认证请求中使用以下占位符,请求时 EMQ X 将自动填充为客户端信息:
%u:用户名
%c:Client ID
%a:客户端 IP 地址
%r:客户端接入协议
%P:明文密码
%p:客户端端口
%C:TLS 证书公用名(证书的域名或子域名),仅当 TLS 连接时有效
%d:TLS 证书 subject,仅当 TLS 连接时有效
详细配置说明见配置项
WEB钩子
WebHook 是由 emqx_web_hook插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务 的功能。
WebHook 的内部实现是基于 钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。
以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:
1 | Client | EMQ X | emqx_web_hook | HTTP +------------+ |
WebHook 对于事件的处理是单向的,它仅支持将 EMQ X 中的事件推送给 Web 服务,并不关心 Web 服务的返回。 借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。
配置项
Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf
,配置项的详细说明可以查看 配置项。
触发规则
在 etc/plugins/emqx_web_hooks.conf
可配置触发规则,其配置的格式如下:
1 | ## 格式示例 |
Event 触发事件
目前支持以下事件:
名称 | 说明 | 执行时机 |
---|---|---|
client.connect | 处理连接报文 | 服务端收到客户端的连接报文时 |
client.connack | 下发连接应答 | 服务端准备下发连接应答报文时 |
client.connected | 成功接入 | 客户端认证完成并成功接入系统后 |
client.disconnected | 连接断开 | 客户端连接层在准备关闭时 |
client.subscribe | 订阅主题 | 收到订阅报文后,执行 client.check_acl 鉴权前 |
client.unsubscribe | 取消订阅 | 收到取消订阅报文后 |
session.subscribed | 会话订阅主题 | 完成订阅操作后 |
session.unsubscribed | 会话取消订阅 | 完成取消订阅操作后 |
message.publish | 消息发布 | 服务端在发布(路由)消息前 |
message.delivered | 消息投递 | 消息准备投递到客户端前 |
message.acked | 消息回执 | 服务端在收到客户端发回的消息 ACK 后 |
message.dropped | 消息丢弃 | 发布出的消息被丢弃后 |
Number
同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。
Rule
触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:
- action:字符串,取固定值
- topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发
例如,我们只将与 a/b/c
和 foo/#
主题匹配的消息转发到 Web 服务器上,其配置应该为:
1 | web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"} |
这样 Webhook 仅会转发与 a/b/c
和 foo/#
主题匹配的消息,例如 foo/bar
等,而不是转发 a/b/d
或 fo/bar
。
Webhook 事件参数
事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 url
所配置的 Web 服务器上。其请求格式为:
1 | URL: <url> # 来自于配置中的 url 字段 |
对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:
client.connect
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_connect” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
client.connack
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_connack” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
conn_ack | string | “success” 表示成功,其它表示失败的原因 |
client.connected
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_connected” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
connected_at | integer | 时间戳(秒) |
client.disconnected
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_disconnected” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
reason | string | 错误原因 |
client.subscribe
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_subscribe” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
topic | string | 将订阅的主题 |
opts | json | 订阅参数 |
opts 包含
Key | 类型 | 说明 |
---|---|---|
qos | enum | QoS 等级,可取 0 1 2 |
client.unsubscribe
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”client_unsubscribe” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
session.subscribed:同 client.subscribe
,action 为 session_subscribed
session.unsubscribed:同 client.unsubscribe
,action 为 session_unsubscribe
session.terminated: 同 client.disconnected
,action 为 session_terminated
message.publish
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”message_publish” |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息的时间戳(毫秒) |
message.delivered
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”message_delivered” |
clientid | string | 接收端 ClientId |
username | string | 接收端 Username,不存在时该值为 “undefined” |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息时间戳(毫秒) |
message.acked
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:”message_acked” |
clientid | string | 接收端 ClientId |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息时间戳(毫秒) |
这里有个巨坑,我是直接看官方教程使用的是EMQ X4.0.0版本的镜像,然后在配置WEB钩子的时候,由于我的钩子接口使用的是HTTPS,所以需要配置相关的证书文件,但是按照官方的指示配置好之后。该插件无法启动,报错。当时忘了截图了,反正是说配置文件中的有关ssl的配置项不存在,非法。弄了很久,才发现可能是4.0.0版本不支持,升级到最新版之后错误解决。
当然,这里只是简单介绍了EMQ X的基本用法,如有其他需要请移步EMQ X