记一次MQTT服务器EMQ X的使用

1. EMQ X介绍

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。

Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。

MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

2. 如何安装

我自己使用的是Docker安装的,毕竟很方便,主要是如果安装得有问题或者配置得有问题可以直接删除,大不了重新run就是了。

  • 拉取镜像
    1
    docker pull emqx/emqx
  • 启动镜像
    1
    docker run -d -v /root/emqx-config:/opt/emqx/etc/plugins --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
  1. 这里详细介绍一下各个端口的说明
端口号 说明
1883 MQTT/TCP 协议端口
11883 MQTT/TCP 协议内部端口,仅用于本机客户端连接
8883 MQTT/SSL 协议端口
8083 MQTT/WS 协议端口
8084 MQTT/WSS 协议端口
18083 WEB 控制面板
8081 HTTP API 操作EMQ X的API接口
  1. EMQ X的插件配置都在/opt/emqx/etc/plugins下,我一般比较喜欢把这个文件夹映射出来。不能直接映射出来,如果第一次就以-v方式挂载的话,容器内的plugins目录会被清空。所以这里有个技巧,先不加-v启动一次,再使用docker cp emqx:/opt/emqx/etc/plugins /root/emqx-config 把配置文件导出,再删除这个容器。再以-v方式启动,这样就可以直接在宿主机上修改配置文件了。当然,如果你不觉得麻烦,也可以不挂载宿主机,直接进入容器修改。

  2. 在容器环境中,有一些操作EMQ X的基本命令。但是我并不推荐用这个这些命令去操作,毕竟谁会去记一大堆的命令呢?我一般都是通过WEB控制面板来操作和监控EMQ X的,其实它自带的WEB面板就已经非常好用了。

3. 如何使用

我之所以要用到EMQ X,还是因为毕设有一个在线聊天的需求。想了几种解决方案,无非就是socket三方IM服务还有就是MQTT

  1. socket需要自己实现,麻烦!可能还不稳定,性能也可能不够。

  2. 第三方的IM服务,功能强大但太过冗余,毕竟只是一个在线聊天,不涉及其他的群啊、黑名单啊这些乱七八糟的。

  3. MQTT,在看了几种主流的MQTT服务器之后,选择了EMQ X。主要是因为其功能强大,使用简单,文档丰富。

在启动之后,可以去看看启动的日志docker logs emqx或者直接访问http://127.0.0.1:18083看是否可以进入WEB控制面板,控制面板的初始账号是admin密码是public。登录之后可以进入通用 > 用户 > 编辑 > 修改密码来修改admin账号的登陆密码。

EMQ X的所有插件都在插件栏下,下面主要介绍一下我在这次毕设中使用的两个插件功能,HTTP鉴权WEB钩子

TIP:修改完配置文件之后记得重启插件,建议在WEB控制面板中操作。

HTTP鉴权

其支持的鉴权插件很多,详见https://docs.emqx.cn/broker/v4.3/advanced/auth.html。这里只介绍HTTP方式,其插件名为emqx_auth_http,顾名思义就是通过HTTP接口来确认用户的连接资格。

认证原理

EMQ X 在设备连接事件中使用当前客户端相关信息作为参数,向用户自定义的认证服务发起请求查询权限,通过返回的 HTTP 响应状态码 (HTTP statusCode) 来处理认证请求。

  • 认证失败:API 返回 4xx 状态码
  • 认证成功:API 返回 200 状态码
  • 忽略认证:API 返回 200 状态码且消息体 ignore

加盐规则与哈希方法

HTTP 在请求中传递明文密码,加盐规则与哈希方法取决于 HTTP 应用。

认证请求

进行身份认证时,EMQ X 将使用当前客户端信息填充并发起用户配置的认证查询请求,查询出该客户端在 HTTP 服务器端的认证数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# etc/plugins/emqx_auth_http.conf

## 请求地址
auth.http.auth_req = http://127.0.0.1:80/mqtt/auth

## HTTP 请求方法
## Value: post | get | put
auth.http.auth_req.method = post

## 认证请求的 HTTP 请求头部,默认情况下配置 Content-Type 头部。
## Content-Type 头部目前支持以下值:application/x-www-form-urlencoded,application/json
auth.http.auth_req.headers.content-type = application/x-www-form-urlencoded

## 请求参数
auth.http.auth_req.params = clientid=%c,username=%u,password=%P

HTTP 请求方法为 GET 时,请求参数将以 URL 查询字符串的形式传递;POST、PUT 请求则将请求参数以普通表单形式或者以 Json 形式提交(由 content-type 的值决定)。

你可以在认证请求中使用以下占位符,请求时 EMQ X 将自动填充为客户端信息:

  • %u:用户名

  • %c:Client ID

  • %a:客户端 IP 地址

  • %r:客户端接入协议

  • %P:明文密码

  • %p:客户端端口

  • %C:TLS 证书公用名(证书的域名或子域名),仅当 TLS 连接时有效

  • %d:TLS 证书 subject,仅当 TLS 连接时有效

详细配置说明见配置项

WEB钩子


WebHook 是由 emqx_web_hook插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务 的功能。

WebHook 的内部实现是基于 钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

1
2
3
4
  Client      |    EMQ X     |  emqx_web_hook |   HTTP       +------------+
=============>| - - - - - - -> - - - - - - - ->===========> | Web Server |
| Broker | | Request +------------+

WebHook 对于事件的处理是单向的,它仅支持将 EMQ X 中的事件推送给 Web 服务,并不关心 Web 服务的返回。 借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

配置项

Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf,配置项的详细说明可以查看 配置项

触发规则

etc/plugins/emqx_web_hooks.conf 可配置触发规则,其配置的格式如下:

1
2
3
4
5
6
## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

Event 触发事件

目前支持以下事件:

名称 说明 执行时机
client.connect 处理连接报文 服务端收到客户端的连接报文时
client.connack 下发连接应答 服务端准备下发连接应答报文时
client.connected 成功接入 客户端认证完成并成功接入系统后
client.disconnected 连接断开 客户端连接层在准备关闭时
client.subscribe 订阅主题 收到订阅报文后,执行 client.check_acl 鉴权前
client.unsubscribe 取消订阅 收到取消订阅报文后
session.subscribed 会话订阅主题 完成订阅操作后
session.unsubscribed 会话取消订阅 完成取消订阅操作后
message.publish 消息发布 服务端在发布(路由)消息前
message.delivered 消息投递 消息准备投递到客户端前
message.acked 消息回执 服务端在收到客户端发回的消息 ACK 后
message.dropped 消息丢弃 发布出的消息被丢弃后

Number

同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

Rule

触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 a/b/cfoo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:

1
2
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

这样 Webhook 仅会转发与 a/b/cfoo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/dfo/bar

Webhook 事件参数

事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 url 所配置的 Web 服务器上。其请求格式为:

1
2
3
URL: <url>      # 来自于配置中的 url 字段
Method: POST # 固定为 POST 方法
Body: <JSON> # Body 为 JSON 格式字符串

对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:

client.connect

Key 类型 说明
action string 事件名称 固定为:”client_connect”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号

client.connack

Key 类型 说明
action string 事件名称 固定为:”client_connack”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号
conn_ack string “success” 表示成功,其它表示失败的原因

client.connected

Key 类型 说明
action string 事件名称 固定为:”client_connected”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号
connected_at integer 时间戳(秒)

client.disconnected

Key 类型 说明
action string 事件名称 固定为:”client_disconnected”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
reason string 错误原因

client.subscribe

Key 类型 说明
action string 事件名称 固定为:”client_subscribe”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
topic string 将订阅的主题
opts json 订阅参数

opts 包含

Key 类型 说明
qos enum QoS 等级,可取 0 1 2

client.unsubscribe

Key 类型 说明
action string 事件名称 固定为:”client_unsubscribe”
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 “undefined”
topic string 取消订阅的主题

session.subscribed:同 client.subscribe,action 为 session_subscribed

session.unsubscribed:同 client.unsubscribe,action 为 session_unsubscribe

session.terminated: 同 client.disconnected,action 为 session_terminated

message.publish

Key 类型 说明
action string 事件名称 固定为:”message_publish”
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 “undefined”
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息的时间戳(毫秒)

message.delivered

Key 类型 说明
action string 事件名称 固定为:”message_delivered”
clientid string 接收端 ClientId
username string 接收端 Username,不存在时该值为 “undefined”
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 “undefined”
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息时间戳(毫秒)

message.acked

Key 类型 说明
action string 事件名称 固定为:”message_acked”
clientid string 接收端 ClientId
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 “undefined”
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息时间戳(毫秒)

这里有个巨坑,我是直接看官方教程使用的是EMQ X4.0.0版本的镜像,然后在配置WEB钩子的时候,由于我的钩子接口使用的是HTTPS,所以需要配置相关的证书文件,但是按照官方的指示配置好之后。该插件无法启动,报错。当时忘了截图了,反正是说配置文件中的有关ssl的配置项不存在,非法。弄了很久,才发现可能是4.0.0版本不支持,升级到最新版之后错误解决。

当然,这里只是简单介绍了EMQ X的基本用法,如有其他需要请移步EMQ X