背景
这两天在排查一个 RTOS 设备短时间内重启几十次之后连接不上 MQTT 的问题,记录一下,顺便简单介绍下 MQTT 协议。
MQTT轻量发布订阅消息协议
概览
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP 协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的 发布/订阅 型消息协议,是由 IBM 公司开发的,“MQTT” 中的 “MQ” 是来自于 IBM 的 MQ 系列消息队列产品线。它具有以下主要的几项特性:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
- 对负载内容屏蔽的消息传输
- 使用 TCP/IP 提供网络连接
有三种消息发布服务质量
- Qos0 “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
- Qos1 “至少一次”,确保消息到达,但消息重复可能会发生
- Qos2 “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制
除了 MQTT 的协议特性外还有一些客观原因:
- 开放消息协议,简单易实现
- 1字节固定报头,2字节心跳报文,报文结构紧凑
- 消息QoS支持,可靠传输保证
- 主流语言的客户端都有,并且有数十个 MQTT 服务器端程序可供选择
- 大部分硬件方案天生支持
应用
MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。
- 物联网 M2M 通信,物联网大数据采集
- Android 消息推送,WEB 消息推送
- 移动即时消息,例如:Facebook Messenger
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
所以综上所述,在公司做 IOT 设备相关的业务时,最终在技术选型上选择了使用 MQTT 协议。同时在智能设备即 Android 设备上,将原来用的极光推送、小米推等消息推送,替换为 MQTT 推送。
附上:
MQTT官网:http://mqtt.org
MQTT V3.1.1协议规范:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
MQTT官方:https://github.com/mqtt/mqtt.github.io
服务中间件列表:https://github.com/mqtt/mqtt.github.io/wiki/servers
客户端列表:https://github.com/mqtt/mqtt.github.io/wiki/libraries
MQTT基于主题(Topic)消息路由
MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:
chat/room/1
sensor/10/temperature
sensor/+/temperature
$SYS/broker/metrics/packets/received
$SYS/broker/metrics/#
主题 (Topic) 通过 ’/’ 分割层级,支持 ’+’, ‘#’ 通配符:
'+': 表示通配一个层级,例如a/+,匹配a/x, a/y
'#': 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d
订阅者与发布者之间通过主题路由消息进行通信,例如采用 mosquitto 命令行发布订阅消息:
mosquitto_sub -t a/b/+ -q 1
mosquitto_pub -t a/b/c -m hello -q 1
MQTT V3.1.1协议报文
报文结构
固定报头(Fixed header) |
可变报头(Variable header) |
报文有效载荷(Payload) |
固定报头
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
byte1 | MQTT Packet type | Flags | ||||||
byte2… | Remaining Length | |||||||
报文类型
类型名称 | 类型值 | 报文说明 |
---|---|---|
CONNECT | 1 | 发起连接 |
CONNACK | 2 | 连接回执 |
PUBLISH | 3 | 发布消息 |
PUBACK | 4 | 发布回执 |
PUBREC | 5 | QoS2消息回执 |
PUBREL | 6 | QoS2消息释放 |
PUBCOMP | 7 | QoS2消息完成 |
SUBSCRIBE | 8 | 订阅主题 |
SUBACK | 9 | 订阅回执 |
UNSUBSCRIBE | 10 | 取消订阅 |
UNSUBACK | 11 | 取消订阅回执 |
PINGREQ | 12 | PING请求 |
PINGRESP | 13 | PING响应 |
DISCONNECT | 14 | 断开连接 |
PUBLISH发布消息
PUBLISH 报文承载客户端与服务器间双向的发布消息。 PUBACK 报文用于接收端确认 QoS1 报文,PUBREC/PUBREL/PUBCOMP 报文用于QoS2消息流程。
PINGREQ/PINGRESP心跳
客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送 PINGREQ 心跳报文,服务端响应 PINGRESP 报文。PINGREQ/PINGRESP 报文均2个字节。
MQTT消息QoS
MQTT 发布消息 QoS 保证不是端到端的,是客户端与服务器之间的。订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS。
发布消息的QoS | 主题订阅的QoS | 接收消息的QoS |
---|---|---|
0 | 0 | 0 |
0 | 1 | 0 |
0 | 2 | 0 |
1 | 0 | 0 |
1 | 1 | 1 |
1 | 2 | 1 |
2 | 0 | 0 |
2 | 1 | 1 |
2 | 2 | 2 |
Qos0消息发布订阅
最多一次的传输
消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。
Qos1消息发布订阅
至少一次的传输
服务器接收到消息会被确认,通过传输一个 PUBACK 信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的 DUP 位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE 和 UNSUBSCRIBE 都使用 level 1 的QoS。如果客户端没有接收到 PUBACK 信息(无论是应用定义的超时,还是检测到失败然后通讯 session 重启),客户端都会再次发送 PUBLISH 信息,并且将 DUP 位置1。当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个 PUBACK 消息。所以在堵塞的情况下会出现很多的重复消息。
Qos2消息发布订阅
只有一次的传输
在 QoS level 1 上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。QoS level 2 在消息头有 Message ID。
MQTT会话(Clean Session)
MQTT 客户端向服务器发起 CONNECT 请求时,可以通过 ’Clean Session’ 标志设置会话。
‘Clean Session’ 设置为0 或 false,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’ 设置为1 或 true,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
MQTT连接保活心跳
MQTT 客户端向服务器发起 CONNECT 请求时,通过 KeepAlive 参数设置保活周期。
客户端在无报文发送时,按 KeepAlive 周期定时发送 2字节的 PINGREQ 心跳报文,服务端收到 PINGREQ 报文后,回复 2字节的 PINGRESP 报文。
服务端在 1.5 个心跳周期内,既没有收到客户端发布订阅报文,也没有收到 PINGREQ 心跳报文时,主动心跳超时断开客户端 TCP 连接。
MQTT遗愿消息(Last Will)
MQTT 客户端向服务器端 CONNECT 请求时,可以设置是否发送遗愿消息 (Will Message) 标志,和遗愿消息主题 (Topic) 与内容 (Payload)。
MQTT 客户端异常下线时 (客户端断开前未向服务器发送 DISCONNECT 消息),MQTT 消息服务器会发布遗愿消息。
MQTT保留消息(Retained Message)
MQTT 客户端向服务器发布 (PUBLISH) 消息时,可以设置保留消息 (Retained Message) 标志。保留消息 (Retained Message) 会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。
例如 mosquitto 命令行发布一条保留消息到主题 ’a/b/c’:
mosquitto_pub -r -q 1 -t a/b/c -m 'hello'
之后连接上来的 MQTT 客户端订阅主题 ’a/b/c’ 时候,仍可收到该消息:
$ mosquitto_sub -t a/b/c -q 1
hello
保留消息 (Retained Message) 有两种清除方式:
- 客户端向有保留消息的主题发布一个空消息:
mosquitto_pub -r -q 1 -t a/b/c -m ''
- 消息服务器设置保留消息的超期时间
MQTT WebSocket连接
MQTT 协议除支持 TCP 传输层外,还支持 WebSocket 作为传输层。通过 WebSocket 浏览器可以直连 MQTT 消息服务器,发布订阅模式与其他 MQTT 客户端通信。
MQTT 协议的 WebSocket 连接,必须采用 binary 模式,并携带子协议 Header:
Sec-WebSocket-Protocol: mqttv3.1 或 mqttv3.1.1
MQTT与XMPP协议对比
MQTT 协议设计简单轻量、路由灵活,将在移动互联网物联网消息领域,全面取代 PC 时代的 XMPP 协议:
- MQTT 协议一个字节固定报头,两个字节心跳报文,报文体积小编解码容易。XMPP 协议基于繁重的 XML,报文体积大且交互繁琐
- MQTT 协议基于主题 (Topic) 发布订阅模式消息路由,相比 XMPP 基于 JID 的点对点消息路由更为灵活
- MQTT 协议未定义报文内容格式,可以承载 JSON、二进制等不同类型报文。XMPP 协议采用 XML 承载报文,二进制必须Base64 编码等处理
- MQTT 协议支持消息收发确认和 QoS 保证,XMPP 主协议并未定义类似机制。MQTT 协议有更好的消息可靠性保证。
问题记录
说是记录问题,没想到扯了这么多,言归正传。说问题,这个问题可以说是 Clean Session 引发的血案,之前的 IoT 设备都是设置的 true,也就是每次连接都会创建一个新的临时会话,断开会话自动销毁。但是最新生产的设备打印机,需要在离线时候也需要接收到离线消息,保证打印不漏单和一个打印的时效性,就设置了 Clean Session 为 false,创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
然后测试在设备性能压测和健壮性测试过程中,发现反复短时间内重启设备50次左右,同时后台不断推送订单进行打印,设备再重启联网后,就连接不上 MQTT 了,log如下:
疯狂报mqtt_revTopicMsg() app_mqtt_recv_msg_count fail -40035
,app_mqtt_subscribe() mqtt_subscribe ret = -40030
,mqtt_connectServer() SubcribeAllTopic fail
等错误。
但是把设备 Clean Session 改为 true,就可以马上连接上,一度怀疑是我服务端的问题,服务器拒绝了设备的连接,就进行了一波深入排查。因为我们消息使用的是 Qos2 等级。所以很快发现了引起这个现象的原因:设备在重启之前有 PUBREL 消息未回复 PUBCOMP 给服务器,导致重新连接后服务器重发 PUBREL 到设备,而设备认为是非法消息而关闭了连接(可参照本文中 Qos2消息发布订阅流程图)。
终于真相了,最后解决办法暂时是:设备收到这样的消息也当成正常的,回复 PUBCOMP 过去,就能正常连接了。也有别的办法,从业务层面规避这个问题,就是不用 MQTT 做消息持久化,转而通过第三方中间件来做消息持久化,毕竟 MQTT 协议本来就不适合做消息持久化,当成消息通道即可。具体问题具体解决吧!看个人的选择。