MQTT(消息传输协议)

# 简介 1. 优势:长连接,低带宽,高可靠。 2. 实用场景:需要服务器下发消息给设备,需要及时收到。例如,远程开关,充电桩等 # 几个重要名词 1. **username**用户名,**password**密码,**clientid**用户标识,mqtt可以通过前两个参数保证连接的安全,通过clientid确保设备唯一性。 2. **qos**消息质量,分为0,1,2三个等级,分别表示只发一次,至少收到一次和只收到一次,通过qos可以根据数据的重要性灵活选择发送方式以节省带宽和保证数据可靠。例如不重要的数据采集可使用qos0只发一次不关心服务器是否收到,重要的消息通过qos1确保能收到,付款信息危险设备开关等场景使用qos2保证数据到达的同时且不重复。 3. **host**主机,**port**端口,mqtt需要连接的服务器参数。host可以说ip或者域名。 4. **topic**主题,根据主题区别消息类型和来源,主要用来分类数据。同时mqtt是发布订阅模型,topic是发布和订阅者通信的重要通道。 5. **payload**消息内容,发布和订阅的具体数据。 6. **retain**保留消息,保留消息是一条将保留标志(retained flag)置为true的普通MQTT消息。broker会针对主题依照QoS级别保留最后一条保留消息,当订阅者订阅主题时会立即收到保留消息。broker仅为每个主题保留一条保留消息。 # API说明 mqtt的API在LuatOS-Air lib有做封装,建议直接用lib的API接口。 |API接口| 描述| | --- | --- | | mqtt.client()| 创建一个mqtt client实例| | mqttc:connect()| 连接mqtt服务器| | mqttc:subscribe()| 订阅主题| | mqttc:unsubscribe()| 取消订阅主题| | mqttc:publish()| 发布一条消息| | mqttc:receive()| 接收消息| | mqttc:disconnect()| 断开与服务器的连接| > 详细的API介绍见[mqtt API章节](https://doc.openluat.com/wiki/21?wiki_page_id=2280 "mqtt API章节") # 实现流程 - 创建任务 通过sys.taskInit() 创建一个协程。 - 等待网络就绪 采用socket.isReady()这个接口阻塞操作,程序运行到这里会进入等待直到底层网络注册完成,网络状态就绪,否则等待至超时。 - 创建一个实例 LuatOS-Air的mqtt操作首先要创建一个实例使用[mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)](https://doc.openluat.com/wiki/21?wiki_page_id=2280)创建一个实例。 - 连接服务器 然后使用[mqttc:connect(host, port, transport, cert, timeout)](https://doc.openluat.com/wiki/21?wiki_page_id=2280)连接服务器。 - 订阅主题 使用[mqttc:subscribe(topic, qos)](https://doc.openluat.com/wiki/21?wiki_page_id=2280)订阅主题。 - 发布一条消息 使用[mqttc:publish(topic, payload, qos, retain)](https://doc.openluat.com/wiki/21?wiki_page_id=2280)发布一条消息。 - 接收消息 使用[mqttc:receive(timeout, msg)](https://doc.openluat.com/wiki/21?wiki_page_id=2280)接收消息。 - 断开与服务器的连接 当网络被断开时使用[mqttc:disconnect()](https://doc.openluat.com/wiki/21?wiki_page_id=2280)断开与服务器的连接。 # 示例 本文以demo\mqtt\sync\sendWaitRecv这个DEMO为例做演示。 1. 我们可以通过MQTT.fx工具来测试mqtt运行状况,我们这里和脚本连接同一个MQTT服务器,用来监视MQTT运行状态。我们脚本里会10秒钟发布一个/qos0topic主题的数据,20秒钟会发布一个/中文qos1topic主题的数据,监视结果如下。可以看到/qos0topic主题的数据的数据是/中文qos1topic主题的数据的2倍。 ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210409165602270_2.png "undefined") 2. 代码分析 - mqttTask.lua实现的是建立一个mqtt的流程和异常处理机制。要先等待网络就绪了才可以做mqtt相关操作,这里的等待超时时间位5分钟,如果模块等待5分钟还没注册上网络的话,会通过进入飞行模式和退出飞行模式的方法重启一下协议栈。 ```lua --启动MQTT客户端任务 sys.taskInit( function() local retryConnectCnt = 0 while true do if not socket.isReady() then retryConnectCnt = 0 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) end if socket.isReady() then local imei = misc.getImei() --创建一个MQTT客户端 local mqttClient = mqtt.client(imei,600,"user","password") else --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end ) ``` - 创建MQTT客户端,并连接,这里clientId用的时设备的IMEI号,设置的心跳时间为600秒,连接类型这里用的是'tcp',支持'tcp'和'tcp_ssl'两种连接方式,可以根据实际需求来选择不同的连接方式。 ```lua --创建一个MQTT客户端 local mqttClient = mqtt.client(imei,600,"user","password") --连接MQTT mqttClient:connect("lbsmqtt.airm2m.com",1884,"tcp") ``` - 主题订阅和数据的发布和接收,这里订阅了topic为"/event0"和"/中文event1"为的2这个主题,这里通过while循环处理接收和发送的数据。 ```lua if mqttClient:subscribe({["/event0"]=0, ["/中文event1"]=1}) then mqttOutMsg.init() --循环处理接收和发送的数据 while true do if not mqttInMsg.proc(mqttClient) then log.error("mqttTask.mqttInMsg.proc error") break end if not mqttOutMsg.proc(mqttClient) then log.error("mqttTask.mqttOutMsg proc error") break end end mqttOutMsg.unInit() end ``` - 数据接收的处理是在mqttInMsg.lua脚本中实现的,这里通过在while循环里调用mqttc:receive()函数来实现的,这里设置的超时时间为60秒,60秒没收到数据的话会跳出次循环。 ```lua --- MQTT客户端数据接收处理 -- @param mqttClient,MQTT客户端对象 -- @return 处理成功返回true,处理出错返回false -- @usage mqttInMsg.proc(mqttClient) function proc(mqttClient) local result,data while true do result,data = mqttClient:receive(60000,"APP_SOCKET_SEND_DATA") --接收到数据 if result then log.info("mqttInMsg.proc",data.topic,string.toHex(data.payload)) --TODO:根据需求自行处理data.payload else break end end return result or data=="timeout" or data=="APP_SOCKET_SEND_DATA" end ``` - 数据发送的处理是在mqttOutMsg.lua中实现的,发送的消息是通过消息队列维护的,要发送的数据先通调用insertMsg()函数插入要发送的消息到msgQueue,等之前的数据发送完成后会通过调用proc()函数从msgQueue队列中取出第一个消息,通过调用mqttClient:publish()函数,真正的把数据发送出去。这里定时发送是通过在回调函数里启动定时的方式实现的。 ```lua --数据发送的消息队列 local msgQueue = {} --插入发发送的数据到消息队列 local function insertMsg(topic,payload,qos,user) table.insert(msgQueue,{t=topic,p=payload,q=qos,user=user}) sys.publish("APP_SOCKET_SEND_DATA") end --主题为"/qos0topic"的消息回调函数 local function pubQos0TestCb(result) log.info("mqttOutMsg.pubQos0TestCb",result) if result then sys.timerStart(pubQos0Test,10000) end end --插入主题为"/qos0topic"的消息到消息队列 function pubQos0Test() insertMsg("/qos0topic","qos0data",0,{cb=pubQos0TestCb}) end --主题为"/中文qos1topic"的消息回调函数 local function pubQos1TestCb(result) log.info("mqttOutMsg.pubQos1TestCb",result) if result then sys.timerStart(pubQos1Test,20000) end end --插入主题为"/中文qos1topic"的消息到消息队列 function pubQos1Test() insertMsg("/中文qos1topic","中文qos1data",1,{cb=pubQos1TestCb}) end --从消息队列中取出消息,发送到服务器 function proc(mqttClient) while #msgQueue>0 do local outMsg = table.remove(msgQueue,1) local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q) if outMsg.user and outMsg.user.cb then outMsg.user.cb(result,outMsg.user.para) end if not result then return end end return true end ``` - log解析,服务器连接,这个也实际上也是通过SOCKET连接的,连接方式TCP,超时时间也是默认的120S ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412112856752_3.png "undefined") 数据发送,可以看出"/qos0topic"主题是10S一次,"/中文qos1topic"主题是20S一次 ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412113215147_4.png "undefined") ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412113057561_5.png "undefined") 数据接收,通过MQTT.fx工具,发送一个主题为"/event0"的消息,模块正常收到,并打印出来。 发送 ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412113821281_6.png "undefined") 接收 ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412113837000_7.png "undefined") 这里打印出的数据是HEX格式的,可以看出接收的数据是也是正确的 ![undefined](https://cdn.openluat-luatcommunity.openluat.com/images/20210412143218456_7.png "undefined") 完整代码,直接下载\demo\mqtt\sync\sendWaitRecv下的脚本即可。 # 常见问题 ## 连接服务器失败 1. 检查下模块信号、网络注册、网络附着、PDP激活状态 2. 检查下SIM卡是否欠费【4G模块有一种欠费表现:无法注册4G网络,可以注册2G网络】 3. 使用mqtt.fx,连接服务器确认一下是否可以连接成功,排除服务器故障 4. 部分国外的开源项目提供免费的MQTT代理服务器,因为网络的原因,国内存在严重的延迟或者丢包现象,导致程序运行出现问题,此现象在AT开发时影响严重 ## 最多同时支持多少个连接 10个。 ## 如何实现掉线自动重连 参考mqtt demo,实现自动重连即可 ## 频繁掉线是什么原因 1. 检查下是否存在代码逻辑错误,导致异常 2. 检查下是否不断重启,导致异常 3. 检查下服务器网络是否稳定,不要用内网穿透方式搭建服务器 4. 检查下使用环境是否网络覆盖不好,例如车库、地下、电梯、山区等 5. 排查是否为设备天线问题 6. 如果经常出现连接被动断开:   1) 检查下mqtt keep alive的时间,一般建议使用2分钟【如果每2分钟内都有应用数据收发,则可以把mqtt keep alive的时间设置的长一点儿】,除非有强制要求,否则不能太长,也不能太短。不建议超过4分钟,基站策略会关闭长时间没有数据传输的连接,太长时间可能会导致连接被基站关闭;不建议少于1分钟,太短时间可能会因为网络环境波动导致上行数据发送超时,可能超过1.5倍的心跳时间,从而被服务器主动断开连接   2) 检查下是否在1.5倍的mqtt keep alive的时间,没有成功发送数据到服务器,就会被被服务器主动断开,这种情况一般都是发送数据超时引起的 ## 如果KeepAlive设置的时间长,会不会被移动断开? 会,一般建议使用2分钟,不建议超过4分钟,基站策略会关闭长时间没有数据传输的连接,太长时间可能会导致连接被基站关闭 ## 如果有很多个mqtt设备连接到服务器,收发数据是怎样区分设备的 通过clientid确保设备唯一性,可以设置IMEI为clientid ## 模块一直狂发 [W]-[socket.client:recv] error CLOSED 连接关闭了,一直在循环接受数据,没退出循环 ## 想让模块在断电的时时刻,通过已连接的MQTT向服务器发一个报警硬件方面建议怎么修改呢? vbat位置接个大的电容,掉电后可以维持一短时间 ## mqtt.client() version MQTT版本号"3.1.1","3.1","5.0"都支持吗 支持3.1和3.1.1