MQTT
# 安然MQTT
```lua
require("global")
local cjson = require("cjson")
local socket = require("socket")
local ser_num = get_var("序列号");
local pub_topic = "/dtu/"..ser_num.."/data"
local sub_topic = "/dtu/"..ser_num.."/control"
local rep_topic = "/dtu/"..ser_num.."/data"
local config = {
uri = "broker-cn.emqx.io:1883",
id = "mqtt-test-05"..ser_num,
username = ser_num,
password = "123456",
}
local mqtt = nil -- 全局mqtt对象,初始值为nil
-- 定义一个函数来初始化MQTT连接
function init_mqtt()
mqtt = create_mqtt_client(config, on_message, on_event)
mqtt:subscribe(sub_topic, 0)
mqtt:run()
end
function on_message(topic, payload, qos)
print(topic, payload, qos)
-- 解析JSON数据
local data = cjson.decode(payload)
-- 获取设备状态
local plc_status = get_var("PLC状态:设备001#");
if (plc_status == 0) then
-- 处理成功后发布成功接收的消息
local ctrl_tbl = { equipment_num = data.equipment_num, fo_num = data.fo_num,
wo_num = data.wo_num , process_name = data.process_name, step_num = data.step_num
, start_date = data.start_date, end_date = data.end_date };
--批量设置变量
set_var_batch(ctrl_tbl);
--处理完后返回数据
local c, d = mqtt:publish(rep_topic, "success")
else
end
--[[
-- 访问解析后的数据
print("Equipment Number:", data.equipmentNum)
print("FO Number:", data.foNum)
print("WO Number:", data.woNum)
print("Process Name:", data.processName)
print("Step Number:", data.stepNum)
print("Start Date:", data.startDate)
print("End Date:", data.endDate)
]]--
end
function on_event(connected, type, msg)
print(connected, type, msg)
end
init_mqtt()
create_timer(function()
now = get_time_second()
--获取变量信息
local equipment_num = get_var("equipment_num");
local product_version = get_var("product_version");
local process_name = get_var("process_name");
local step_num = get_var("step_num");
local start_date = get_var("start_date");
local end_date = get_var("end_date");
local product_status = get_var("product_status");
local user_name = get_var("user_name");
local create_date = os.time()
--创建table
local myTable = {}
myTable["equipment_num"] = equipment_num;
myTable["product_version"] = product_version;
myTable["process_name"] = process_name;
myTable["step_num"] = step_num;
myTable["create_date"] = create_date;
--myTable["end_date"] = end_date;
myTable["product_status"] = product_status;
myTable["user_name"] = user_name;
--数据转json
local jsonData = cjson.encode(myTable)
log("pub topic:", pub_topic)
log("jsonData:", jsonData)
local a, b = mqtt:publish(pub_topic, jsonData)
end, 5)
local i = 0
create_timer(function()
i = i + 1
local status = mqtt:is_connected()
--log("connect status==",status);
if status==false then
--log("connect status==",status);
--log("reconnect");
mqtt:run()
end
end, 20) -- 60秒检测一次连接状态
run()
```
# 研究院MQTT
## 改版前
```lua
require("global")
local cjson = require("cjson")
local socket = require("socket")
local ORGID = "226761"
local ST = get_var("序列号");
local pub_topic = ORGID.."/YC/UP"
local reg_topic = ORGID.."/REG/UP"
local sub_topic = ORGID.."/YC/DOWN"
local config = {
uri = "broker-cn.emqx.io:1883",
id = "mqtt-test-08"..ST,
username = "whxf",
password = "whxf0001"
}
local mqtt = nil -- 全局mqtt对象,初始值为nil
-- 定义一个函数来初始化MQTT连接
function init_mqtt()
mqtt = create_mqtt_client(config, on_message, on_event)
mqtt:subscribe(sub_topic, 0)
mqtt:run()
end
function on_message(topic, payload, qos)
print(topic, payload, qos)
end
function on_event(connected, type, msg)
print(connected, type, msg)
end
function get_current_timestamp()
--获取13位时间戳
local current_time = os.time()
local milliseconds = math.floor(os.clock() * 1000) -- 获取当前毫秒数
local timestamp = current_time * 1000 + milliseconds
return timestamp
end
function generate_four_digit_random()
-- 生成一个四位的随机数
local random_number = math.random(1000, 9999)
return random_number
end
init_mqtt()
create_timer(function()
local status = mqtt:is_connected()
--log("connect status==",status);
if status==false then
--log("connect status==",status);
--log("reconnect");
mqtt:run()
else
end
end, 5) -- 60秒检测一次连接状态
create_timer(function()
-- 调用函数获取13位时间戳
local TT = get_current_timestamp()
local random_number = generate_four_digit_random()
local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number
--print(traceId)
local dataTable = {};
local vars = get_var_list();
for i,var in pairs(vars) do
local name=var[2];
-- 使用 string.find 函数查找 "|"
local startIndex, endIndex = string.find(name, "|")
if startIndex then
local meter_id = string.sub(name, 1, startIndex - 1)
local tag_code = string.sub(name, endIndex + 1)
table.insert(dataTable, tag_code)
else
end
end
-- 创建最终的JSON结构
local jsonData = {
traceId = traceId;
TT = TT;
ST = ST;
tags = dataTable
}
--数据转json
local vardata = cjson.encode(jsonData)
--print(vardata)
local a, b = mqtt:publish(reg_topic, vardata)
end, 60)
socket.sleep(1)
create_timer(function()
-- 调用函数获取13位时间戳
local TT = get_current_timestamp()
local random_number = generate_four_digit_random()
local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number
--print(traceId)
local dataTable = {};
local vars = get_var_list();
--local meter_id = "sdfsdf"
for i,var in pairs(vars) do
local name=var[2];
local value = var[5];
-- 使用 string.find 函数查找 "|"
local startIndex, endIndex = string.find(name, "|")
if startIndex then
local meter_id = string.sub(name, 1, startIndex - 1)
local tag_code = string.sub(name, endIndex + 1)
local dataEntry = {
meter_id = meter_id,
tag_code = tag_code,
value = value,
q = 1
};
table.insert(dataTable, dataEntry)
else
end
end
-- 创建最终的JSON结构
local jsonData = {
traceId = traceId;
TT = TT;
ST = ST;
data = dataTable
}
--数据转json
local vardata = cjson.encode(jsonData)
--print(vardata)
local a, b = mqtt:publish(pub_topic, vardata)
end, 300)
run()
```
## 改版后
```lua
require("global")
local cjson = require("cjson")
local socket = require("socket")
local ORGID = "226761"
local ST = get_var("序列号")
local reg_table = {
pumb_name = "XXX雨量计",
pumb_type = "meter",
ver_id = "100001",
cfg_id = "200001",
address = "",
lon = "155.21",
lat = "32.21",
wan = "true",
lan = "true",
ip="192.168.1.177",
serial ="false",
info="1"
}
local pub_topic = ORGID.."/YC/UP"
local reg_topic = ORGID.."/REG/UP"
local sub_topic = ORGID.."/YC/DOWN"
local config = {
uri = "broker-cn.emqx.io:1883",
id = "mqtt-test-08"..ST,
username = "whxf",
password = "whxf0001"
}
local mqtt = nil -- 全局mqtt对象,初始值为nil
-- 定义一个函数来初始化MQTT连接
function init_mqtt()
mqtt = create_mqtt_client(config, on_message, on_event)
mqtt:subscribe(sub_topic, 0)
mqtt:run()
end
function on_message(topic, payload, qos)
print(topic, payload, qos)
end
function on_event(connected, type, msg)
print(connected, type, msg)
end
function get_current_timestamp()
--获取13位时间戳
local current_time = os.time()
local milliseconds = math.floor(os.clock() * 1000) -- 获取当前毫秒数
local timestamp = current_time * 1000 + milliseconds
return timestamp
end
function generate_four_digit_random()
-- 生成一个四位的随机数
local random_number = math.random(1000, 9999)
return random_number
end
init_mqtt()
create_timer(function()
local status = mqtt:is_connected()
--log("connect status==",status);
if status==false then
--log("connect status==",status);
--log("reconnect");
mqtt:run()
else
end
end, 5) -- 60秒检测一次连接状态
--注册
create_timer(function()
-- 调用函数获取13位时间戳
local TT = get_current_timestamp()
local random_number = generate_four_digit_random()
local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number
--print(traceId)
local meters_table = {};
local tags_table = {};
local vars = get_var_list();
for i,var in pairs(vars) do
local name=var[2];
-- 使用 string.find 函数查找 "|"
--local startIndex, endIndex = string.find(name, "|")
--if startIndex then
-- local meter_id = string.sub(name, 1, startIndex - 1)
-- local tag_code = string.sub(name, endIndex + 1)
-- table.insert(dataTable, tag_code)
--else
--end
local meter_name, meter_id, tag_code = string.match(name, "(.-)|(.-)|(.*)")
if meter_name and meter_id then
-- 尝试在 meters_table 中查找相同 meter_name 的表
local meter_found = false
for _, meter in ipairs(meters_table) do
if meter.meter_name == meter_name then
-- meter_name 已存在,将 tag_code 添加到 tags 中
table.insert(meter.tags, tag_code)
meter_found = true
break
end
end
if not meter_found then
-- meter_name 不存在,创建新的 meter 表并添加到 meters_table 中
local new_meter = {
meter_id = meter_id,
meter_name = meter_name,
meter_type = "pump",
tags = {tag_code}
}
table.insert(meters_table, new_meter)
end
else
end
end
-- 创建最终的JSON结构
local jsonData = {
traceId = traceId;
TT = TT;
ST = ST;
meters = meters_table;
pumb_name = reg_table.pumb_name,
pumb_type = reg_table.pumb_type,
ver_id = reg_table.ver_id,
cfg_id = reg_table.cfg_id,
address = reg_table.address,
lon = reg_table.lon,
lat = reg_table.lat,
wan = reg_table.wan,
lan = reg_table.lan,
ip = reg_table.ip,
serial = reg_table.serial,
info = reg_table.info
}
--数据转json
local vardata = cjson.encode(jsonData)
--print(vardata)
local a, b = mqtt:publish(reg_topic, vardata)
end, 10)
socket.sleep(1)
--遥测
create_timer(function()
-- 调用函数获取13位时间戳
local TT = get_current_timestamp()
local random_number = generate_four_digit_random()
local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number
--print(traceId)
local dataTable = {};
local vars = get_var_list();
--local meter_id = "sdfsdf"
for i,var in pairs(vars) do
local name=var[2];
local value = var[5];
-- 使用 string.find 函数查找 "|"
--local startIndex, endIndex = string.find(name, "|")
--if startIndex then
-- local meter_id = string.sub(name, 1, startIndex - 1)
-- local tag_code = string.sub(name, endIndex + 1)
-- local dataEntry = {
-- meter_id = meter_id,
-- tag_code = tag_code,
-- value = value,
-- q = 1
-- };
-- table.insert(dataTable, dataEntry)
--else
-- end
local meter_name, meter_id, tag_code = string.match(name, "(.-)|(.-)|(.*)")
if meter_name and meter_id and tag_code then
local dataEntry = {
meter_id = meter_id,
tag_code = tag_code,
value = value,
q = 1
};
table.insert(dataTable, dataEntry)
else
end
end
-- 创建最终的JSON结构
local jsonData = {
traceId = traceId;
TT = TT;
ST = ST;
data = dataTable
}
--数据转json
local vardata = cjson.encode(jsonData)
--print(vardata)
local a, b = mqtt:publish(pub_topic, vardata)
end, 30)
run()
```
# 安然最后
```lua
require("global")
local cjson = require("cjson")
local socket = require("socket")
local last_product_status
local ser_num = get_var("序列号");
--local pub_topic = "/dtu/"..ser_num.."/data"
local pub_topic = "ANRAN/UPDATE"
local sub_topic = "/dtu/"..ser_num.."/control"
local rep_topic = "/dtu/"..ser_num.."/data"
local config = {
uri = "256i00r573.wicp.vip:30196",
--uri = "broker-cn.emqx.io:1883",
id = "anran-"..ser_num,
username = "anran",
password = "anran@123456",
}
local mqtt = nil -- 全局mqtt对象,初始值为nil
-- 定义一个函数来初始化MQTT连接
function init_mqtt()
mqtt = create_mqtt_client(config, on_message, on_event)
mqtt:subscribe(sub_topic, 0)
mqtt:run()
end
function on_message(topic, payload, qos)
print(topic, payload, qos)
-- 解析JSON数据
local data = cjson.decode(payload)
-- 获取设备状态
local plc_status = get_var("PLC状态:设备001#");
if (plc_status == 0) then
-- 处理成功后发布成功接收的消息
local ctrl_tbl = { equipment_num = data.equipment_num, fo_num = data.fo_num,
wo_num = data.wo_num , process_name = data.process_name, step_num = data.step_num
, start_date = data.start_date, end_date = data.end_date };
set_var_batch(ctrl_tbl);
--返回信息
local c, d = mqtt:publish(rep_topic, "success")
else
end
end
function on_event(connected, type, msg)
print(connected, type, msg)
end
init_mqtt()
create_timer(function()
local product_status = get_var("product_status");
local up_data_bit = get_var("up_data_bit");
now = get_time_second()
local current_time = os.time()
-- 将时间戳转换为年月日时分秒的格式
local create_date = os.date("%Y-%m-%d %H:%M:%S", current_time)
--if (product_status ~= last_product_status) then
if (up_data_bit ==1) then
--获取变量信息
local equipment_num = get_var("equipment_num");
local fo_num = get_var("fo_num");
local wo_num = get_var("wo_num");
local process_name = get_var("process_name");
local product_status = get_var("product_status");
local user_name = get_var("user_name");
local step_num = get_var("step_num");
local start_date = get_var("start_date");
local end_date = get_var("end_date");
--创建table
local myTable = {}
myTable["equipmentNum"] = equipment_num;
myTable["foNum"] = fo_num;
myTable["processName"] = process_name;
myTable["woNum"] = wo_num;
myTable["createDate"] = create_date;
myTable["productStatus"] = product_status;
myTable["userName"] = user_name;
local dataTable = {}
dataTable["type"] ="FEED_BACK"
dataTable["data"] = myTable
--数据转json
local jsonData = cjson.encode(dataTable)
-- log("pub topic:", pub_topic)
-- log("jsonData:", jsonData)
mqtt:publish(pub_topic, jsonData,1)
end
--2表示异常
if (up_data_bit == 2) then
--获取变量信息
local equipment_num = get_var("equipment_num");
local fo_num = get_var("fo_num");
local wo_num = get_var("wo_num");
local process_name = get_var("process_name");
local product_status = get_var("product_status");
local create_date = os.time()
--创建table
local myTable = {}
myTable["type"] = "PRODUCT_ABNORMAL"
myTable["equipmentNum"] = equipment_num;
myTable["foNum"] = fo_num;
myTable["processName"] = process_name;
myTable["woNum"] = wo_num;
myTable["createDate"] = create_date;
myTable["productStatus"] = product_status;
local dataTable = {}
dataTable["type"] ="FEED_BACK"
dataTable["data"] = myTable
--数据转json
local jsonData = cjson.encode(dataTable)
-- log("pub topic:", pub_topic)
-- log("jsonData:", jsonData)
mqtt:publish(pub_topic, jsonData,1)
end
--3表示完工数量
if (up_data_bit == 3) then
--获取变量信息
local equipment_num = get_var("equipment_num");
local fo_num = get_var("fo_num");
local wo_num = get_var("wo_num");
local weightDate = get_var("weight_date");
local productQty = get_var("product_qty");
local productVersion = get_var("product_version");
--创建table
local myTable = {}
myTable["type"] = "COMPLETE_WEIGHT"
myTable["foNum"] = fo_num;
myTable["woNum"] = wo_num;
myTable["weightDate"] = create_date;
myTable["productQty"] = productQty;
myTable["productVersion"] = productVersion;
local dataTable = {}
dataTable["type"] ="FEED_BACK"
dataTable["data"] = myTable
--数据转json
local jsonData = cjson.encode(dataTable)
-- log("pub topic:", pub_topic)
-- log("jsonData:", jsonData)
local e,f = mqtt:publish(pub_topic, jsonData,1)
print("res-a:",e)
print("res-b:",f)
end
set_var("up_data_bit", 0);
--last_product_status = product_status
end, 1)
local i = 0
create_timer(function()
i = i + 1
local status = mqtt:is_connected()
--log("connect status==",status);
if status==false then
--log("connect status==",status);
--log("reconnect");
mqtt:run()
end
end, 20) -- 60秒检测一次连接状态
run()
```