MQTT

# 安然MQTT ```lua require("global") local cjson = require("cjson") local socket = require("socket") local ser_num = get_var("序列号"); local pub_topic = "/dtu/"..ser_num.."/data" local sub_topic = "/dtu/"..ser_num.."/control" local rep_topic = "/dtu/"..ser_num.."/data" local config = { uri = "broker-cn.emqx.io:1883", id = "mqtt-test-05"..ser_num, username = ser_num, password = "123456", } local mqtt = nil -- 全局mqtt对象,初始值为nil -- 定义一个函数来初始化MQTT连接 function init_mqtt() mqtt = create_mqtt_client(config, on_message, on_event) mqtt:subscribe(sub_topic, 0) mqtt:run() end function on_message(topic, payload, qos) print(topic, payload, qos) -- 解析JSON数据 local data = cjson.decode(payload) -- 获取设备状态 local plc_status = get_var("PLC状态:设备001#"); if (plc_status == 0) then -- 处理成功后发布成功接收的消息 local ctrl_tbl = { equipment_num = data.equipment_num, fo_num = data.fo_num, wo_num = data.wo_num , process_name = data.process_name, step_num = data.step_num , start_date = data.start_date, end_date = data.end_date }; --批量设置变量 set_var_batch(ctrl_tbl); --处理完后返回数据 local c, d = mqtt:publish(rep_topic, "success") else end --[[ -- 访问解析后的数据 print("Equipment Number:", data.equipmentNum) print("FO Number:", data.foNum) print("WO Number:", data.woNum) print("Process Name:", data.processName) print("Step Number:", data.stepNum) print("Start Date:", data.startDate) print("End Date:", data.endDate) ]]-- end function on_event(connected, type, msg) print(connected, type, msg) end init_mqtt() create_timer(function() now = get_time_second() --获取变量信息 local equipment_num = get_var("equipment_num"); local product_version = get_var("product_version"); local process_name = get_var("process_name"); local step_num = get_var("step_num"); local start_date = get_var("start_date"); local end_date = get_var("end_date"); local product_status = get_var("product_status"); local user_name = get_var("user_name"); local create_date = os.time() --创建table local myTable = {} myTable["equipment_num"] = equipment_num; myTable["product_version"] = product_version; myTable["process_name"] = process_name; myTable["step_num"] = step_num; myTable["create_date"] = create_date; --myTable["end_date"] = end_date; myTable["product_status"] = product_status; myTable["user_name"] = user_name; --数据转json local jsonData = cjson.encode(myTable) log("pub topic:", pub_topic) log("jsonData:", jsonData) local a, b = mqtt:publish(pub_topic, jsonData) end, 5) local i = 0 create_timer(function() i = i + 1 local status = mqtt:is_connected() --log("connect status==",status); if status==false then --log("connect status==",status); --log("reconnect"); mqtt:run() end end, 20) -- 60秒检测一次连接状态 run() ``` # 研究院MQTT ## 改版前 ```lua require("global") local cjson = require("cjson") local socket = require("socket") local ORGID = "226761" local ST = get_var("序列号"); local pub_topic = ORGID.."/YC/UP" local reg_topic = ORGID.."/REG/UP" local sub_topic = ORGID.."/YC/DOWN" local config = { uri = "broker-cn.emqx.io:1883", id = "mqtt-test-08"..ST, username = "whxf", password = "whxf0001" } local mqtt = nil -- 全局mqtt对象,初始值为nil -- 定义一个函数来初始化MQTT连接 function init_mqtt() mqtt = create_mqtt_client(config, on_message, on_event) mqtt:subscribe(sub_topic, 0) mqtt:run() end function on_message(topic, payload, qos) print(topic, payload, qos) end function on_event(connected, type, msg) print(connected, type, msg) end function get_current_timestamp() --获取13位时间戳 local current_time = os.time() local milliseconds = math.floor(os.clock() * 1000) -- 获取当前毫秒数 local timestamp = current_time * 1000 + milliseconds return timestamp end function generate_four_digit_random() -- 生成一个四位的随机数 local random_number = math.random(1000, 9999) return random_number end init_mqtt() create_timer(function() local status = mqtt:is_connected() --log("connect status==",status); if status==false then --log("connect status==",status); --log("reconnect"); mqtt:run() else end end, 5) -- 60秒检测一次连接状态 create_timer(function() -- 调用函数获取13位时间戳 local TT = get_current_timestamp() local random_number = generate_four_digit_random() local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number --print(traceId) local dataTable = {}; local vars = get_var_list(); for i,var in pairs(vars) do local name=var[2]; -- 使用 string.find 函数查找 "|" local startIndex, endIndex = string.find(name, "|") if startIndex then local meter_id = string.sub(name, 1, startIndex - 1) local tag_code = string.sub(name, endIndex + 1) table.insert(dataTable, tag_code) else end end -- 创建最终的JSON结构 local jsonData = { traceId = traceId; TT = TT; ST = ST; tags = dataTable } --数据转json local vardata = cjson.encode(jsonData) --print(vardata) local a, b = mqtt:publish(reg_topic, vardata) end, 60) socket.sleep(1) create_timer(function() -- 调用函数获取13位时间戳 local TT = get_current_timestamp() local random_number = generate_four_digit_random() local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number --print(traceId) local dataTable = {}; local vars = get_var_list(); --local meter_id = "sdfsdf" for i,var in pairs(vars) do local name=var[2]; local value = var[5]; -- 使用 string.find 函数查找 "|" local startIndex, endIndex = string.find(name, "|") if startIndex then local meter_id = string.sub(name, 1, startIndex - 1) local tag_code = string.sub(name, endIndex + 1) local dataEntry = { meter_id = meter_id, tag_code = tag_code, value = value, q = 1 }; table.insert(dataTable, dataEntry) else end end -- 创建最终的JSON结构 local jsonData = { traceId = traceId; TT = TT; ST = ST; data = dataTable } --数据转json local vardata = cjson.encode(jsonData) --print(vardata) local a, b = mqtt:publish(pub_topic, vardata) end, 300) run() ``` ## 改版后 ```lua require("global") local cjson = require("cjson") local socket = require("socket") local ORGID = "226761" local ST = get_var("序列号") local reg_table = { pumb_name = "XXX雨量计", pumb_type = "meter", ver_id = "100001", cfg_id = "200001", address = "", lon = "155.21", lat = "32.21", wan = "true", lan = "true", ip="192.168.1.177", serial ="false", info="1" } local pub_topic = ORGID.."/YC/UP" local reg_topic = ORGID.."/REG/UP" local sub_topic = ORGID.."/YC/DOWN" local config = { uri = "broker-cn.emqx.io:1883", id = "mqtt-test-08"..ST, username = "whxf", password = "whxf0001" } local mqtt = nil -- 全局mqtt对象,初始值为nil -- 定义一个函数来初始化MQTT连接 function init_mqtt() mqtt = create_mqtt_client(config, on_message, on_event) mqtt:subscribe(sub_topic, 0) mqtt:run() end function on_message(topic, payload, qos) print(topic, payload, qos) end function on_event(connected, type, msg) print(connected, type, msg) end function get_current_timestamp() --获取13位时间戳 local current_time = os.time() local milliseconds = math.floor(os.clock() * 1000) -- 获取当前毫秒数 local timestamp = current_time * 1000 + milliseconds return timestamp end function generate_four_digit_random() -- 生成一个四位的随机数 local random_number = math.random(1000, 9999) return random_number end init_mqtt() create_timer(function() local status = mqtt:is_connected() --log("connect status==",status); if status==false then --log("connect status==",status); --log("reconnect"); mqtt:run() else end end, 5) -- 60秒检测一次连接状态 --注册 create_timer(function() -- 调用函数获取13位时间戳 local TT = get_current_timestamp() local random_number = generate_four_digit_random() local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number --print(traceId) local meters_table = {}; local tags_table = {}; local vars = get_var_list(); for i,var in pairs(vars) do local name=var[2]; -- 使用 string.find 函数查找 "|" --local startIndex, endIndex = string.find(name, "|") --if startIndex then -- local meter_id = string.sub(name, 1, startIndex - 1) -- local tag_code = string.sub(name, endIndex + 1) -- table.insert(dataTable, tag_code) --else --end local meter_name, meter_id, tag_code = string.match(name, "(.-)|(.-)|(.*)") if meter_name and meter_id then -- 尝试在 meters_table 中查找相同 meter_name 的表 local meter_found = false for _, meter in ipairs(meters_table) do if meter.meter_name == meter_name then -- meter_name 已存在,将 tag_code 添加到 tags 中 table.insert(meter.tags, tag_code) meter_found = true break end end if not meter_found then -- meter_name 不存在,创建新的 meter 表并添加到 meters_table 中 local new_meter = { meter_id = meter_id, meter_name = meter_name, meter_type = "pump", tags = {tag_code} } table.insert(meters_table, new_meter) end else end end -- 创建最终的JSON结构 local jsonData = { traceId = traceId; TT = TT; ST = ST; meters = meters_table; pumb_name = reg_table.pumb_name, pumb_type = reg_table.pumb_type, ver_id = reg_table.ver_id, cfg_id = reg_table.cfg_id, address = reg_table.address, lon = reg_table.lon, lat = reg_table.lat, wan = reg_table.wan, lan = reg_table.lan, ip = reg_table.ip, serial = reg_table.serial, info = reg_table.info } --数据转json local vardata = cjson.encode(jsonData) --print(vardata) local a, b = mqtt:publish(reg_topic, vardata) end, 10) socket.sleep(1) --遥测 create_timer(function() -- 调用函数获取13位时间戳 local TT = get_current_timestamp() local random_number = generate_four_digit_random() local traceId = ORGID.."-"..ST.."-"..TT.."-"..random_number --print(traceId) local dataTable = {}; local vars = get_var_list(); --local meter_id = "sdfsdf" for i,var in pairs(vars) do local name=var[2]; local value = var[5]; -- 使用 string.find 函数查找 "|" --local startIndex, endIndex = string.find(name, "|") --if startIndex then -- local meter_id = string.sub(name, 1, startIndex - 1) -- local tag_code = string.sub(name, endIndex + 1) -- local dataEntry = { -- meter_id = meter_id, -- tag_code = tag_code, -- value = value, -- q = 1 -- }; -- table.insert(dataTable, dataEntry) --else -- end local meter_name, meter_id, tag_code = string.match(name, "(.-)|(.-)|(.*)") if meter_name and meter_id and tag_code then local dataEntry = { meter_id = meter_id, tag_code = tag_code, value = value, q = 1 }; table.insert(dataTable, dataEntry) else end end -- 创建最终的JSON结构 local jsonData = { traceId = traceId; TT = TT; ST = ST; data = dataTable } --数据转json local vardata = cjson.encode(jsonData) --print(vardata) local a, b = mqtt:publish(pub_topic, vardata) end, 30) run() ``` # 安然最后 ```lua require("global") local cjson = require("cjson") local socket = require("socket") local last_product_status local ser_num = get_var("序列号"); --local pub_topic = "/dtu/"..ser_num.."/data" local pub_topic = "ANRAN/UPDATE" local sub_topic = "/dtu/"..ser_num.."/control" local rep_topic = "/dtu/"..ser_num.."/data" local config = { uri = "256i00r573.wicp.vip:30196", --uri = "broker-cn.emqx.io:1883", id = "anran-"..ser_num, username = "anran", password = "anran@123456", } local mqtt = nil -- 全局mqtt对象,初始值为nil -- 定义一个函数来初始化MQTT连接 function init_mqtt() mqtt = create_mqtt_client(config, on_message, on_event) mqtt:subscribe(sub_topic, 0) mqtt:run() end function on_message(topic, payload, qos) print(topic, payload, qos) -- 解析JSON数据 local data = cjson.decode(payload) -- 获取设备状态 local plc_status = get_var("PLC状态:设备001#"); if (plc_status == 0) then -- 处理成功后发布成功接收的消息 local ctrl_tbl = { equipment_num = data.equipment_num, fo_num = data.fo_num, wo_num = data.wo_num , process_name = data.process_name, step_num = data.step_num , start_date = data.start_date, end_date = data.end_date }; set_var_batch(ctrl_tbl); --返回信息 local c, d = mqtt:publish(rep_topic, "success") else end end function on_event(connected, type, msg) print(connected, type, msg) end init_mqtt() create_timer(function() local product_status = get_var("product_status"); local up_data_bit = get_var("up_data_bit"); now = get_time_second() local current_time = os.time() -- 将时间戳转换为年月日时分秒的格式 local create_date = os.date("%Y-%m-%d %H:%M:%S", current_time) --if (product_status ~= last_product_status) then if (up_data_bit ==1) then --获取变量信息 local equipment_num = get_var("equipment_num"); local fo_num = get_var("fo_num"); local wo_num = get_var("wo_num"); local process_name = get_var("process_name"); local product_status = get_var("product_status"); local user_name = get_var("user_name"); local step_num = get_var("step_num"); local start_date = get_var("start_date"); local end_date = get_var("end_date"); --创建table local myTable = {} myTable["equipmentNum"] = equipment_num; myTable["foNum"] = fo_num; myTable["processName"] = process_name; myTable["woNum"] = wo_num; myTable["createDate"] = create_date; myTable["productStatus"] = product_status; myTable["userName"] = user_name; local dataTable = {} dataTable["type"] ="FEED_BACK" dataTable["data"] = myTable --数据转json local jsonData = cjson.encode(dataTable) -- log("pub topic:", pub_topic) -- log("jsonData:", jsonData) mqtt:publish(pub_topic, jsonData,1) end --2表示异常 if (up_data_bit == 2) then --获取变量信息 local equipment_num = get_var("equipment_num"); local fo_num = get_var("fo_num"); local wo_num = get_var("wo_num"); local process_name = get_var("process_name"); local product_status = get_var("product_status"); local create_date = os.time() --创建table local myTable = {} myTable["type"] = "PRODUCT_ABNORMAL" myTable["equipmentNum"] = equipment_num; myTable["foNum"] = fo_num; myTable["processName"] = process_name; myTable["woNum"] = wo_num; myTable["createDate"] = create_date; myTable["productStatus"] = product_status; local dataTable = {} dataTable["type"] ="FEED_BACK" dataTable["data"] = myTable --数据转json local jsonData = cjson.encode(dataTable) -- log("pub topic:", pub_topic) -- log("jsonData:", jsonData) mqtt:publish(pub_topic, jsonData,1) end --3表示完工数量 if (up_data_bit == 3) then --获取变量信息 local equipment_num = get_var("equipment_num"); local fo_num = get_var("fo_num"); local wo_num = get_var("wo_num"); local weightDate = get_var("weight_date"); local productQty = get_var("product_qty"); local productVersion = get_var("product_version"); --创建table local myTable = {} myTable["type"] = "COMPLETE_WEIGHT" myTable["foNum"] = fo_num; myTable["woNum"] = wo_num; myTable["weightDate"] = create_date; myTable["productQty"] = productQty; myTable["productVersion"] = productVersion; local dataTable = {} dataTable["type"] ="FEED_BACK" dataTable["data"] = myTable --数据转json local jsonData = cjson.encode(dataTable) -- log("pub topic:", pub_topic) -- log("jsonData:", jsonData) local e,f = mqtt:publish(pub_topic, jsonData,1) print("res-a:",e) print("res-b:",f) end set_var("up_data_bit", 0); --last_product_status = product_status end, 1) local i = 0 create_timer(function() i = i + 1 local status = mqtt:is_connected() --log("connect status==",status); if status==false then --log("connect status==",status); --log("reconnect"); mqtt:run() end end, 20) -- 60秒检测一次连接状态 run() ```