MQTT Demo

```language #include <stdio.h> #include <string.h> #include <stdlib.h> #include <stdint.h> #include <time.h> #include <ctype.h> #include <stddef.h> #include <stdio.h> #include <stdint.h> #include "teldef.h" #include "osa.h" #include "UART.h" #include "sys.h" #include "sdk_api.h" /* #include "fbf_parse.h" #include "osa.h" #include "atfota.h" #include "cmux_api.h" #include "mi_api.h" #include "download.h" #include "I2C_PMIC_Config.h" #include "libhttpclient.h" */ #include "MQTTClient.h" #undef am_printf #define am_printf(fmt, args...) fatal_printf("MqttTest: "fmt, ##args) #define sleep(x) OSATaskSleep((x) * 200)//second #define _TASK_STACK_SIZE 1024*10 static UINT32 _task_stack[_TASK_STACK_SIZE/sizeof(UINT32)]; static OSTaskRef _task_ref = NULL; static OSATimerRef _timer_ref = NULL; static OSFlagRef _flag_ref = NULL; #define TASK_TIMER_CHANGE_FLAG_BIT 0x01 static void _timer_callback(UINT32 tmrId); static void _task(void *ptr); char MQTT_IP[] = "183.230.40.39"; /* onenet MQTT IP地址 */ int MQTT_PORT = 6002; /* onenet MQTT 端口号 */ char MQTTCLIENT_ID[] = "600324264"; /* onenet 设备ID */ char MQTTUSERNAME[] = "324875"; /* onenet 产品ID 600324264*/ char MQTTPASSWORD[] = "hT=wtearqIsmztZ=S5CN4ttqOig="; /* onenet 设备鉴权 APIKey 信息 */ char MQTT_SUB_TOPIC[] = "mqtt/sample/#"; char MQTT_PUB_TOPIC[] = "mqtt/sample/mifi"; #if !MQTT_THREAD_ARG_FLAG /*for create thread, arg input is abnormal, we use global var*/ MQTTClient mqtt_client; #endif // Device bootup hook before Phase1Inits. // If you have some work to be init, you may implete it here. // ex: you may start your task here. or do some initialize here. extern void Phase1Inits_enter(void); // Device bootup hook after Phase1Inits. // If you have some work to be init, you may implete it here. // ex: you may start your task here. or do some initialize here. extern void Phase1Inits_exit(void); // Device bootup hook before Phase2Inits. // If you have some work to be init, you may implete it here. // ex: you may start your task here. or do some initialize here. extern void Phase2Inits_enter(void); // Device bootup hook after Phase2Inits. // If you have some work to be init, you may implete it here. // ex: you may start your task here. or do some initialize here. extern void Phase2Inits_exit(void); void Phase1Inits_enter(void) { } void Phase1Inits_exit(void) { } void Phase2Inits_enter(void) { } void Phase2Inits_exit(void) { int ret; ret = OSAFlagCreate(&_flag_ref); ASSERT(ret == OS_SUCCESS); ret = OSATimerCreate(&_timer_ref); ASSERT(ret == OS_SUCCESS); ret = OSATaskCreate(&_task_ref, _task_stack, _TASK_STACK_SIZE, 150, "test-task", _task, NULL); ASSERT(ret == OS_SUCCESS); OSATimerStart(_timer_ref, 20 * 200, 20 * 200, _timer_callback, 0); // 10 seconds timer } void messageArrived(MessageData* data) { am_printf("Recv by topic:%.*s message:%.*s \n",data->topicName->lenstring.len,data->topicName->lenstring.data,data->message->payloadlen,data->message->payload); } static void _timer_callback(UINT32 tmrId) { OSAFlagSet(_flag_ref, TASK_TIMER_CHANGE_FLAG_BIT, OSA_FLAG_OR); } extern int SendATCMDWaitResp(int sATPInd,char *in_str, int timeout, char *ok_fmt, int ok_flag, char *err_fmt, char *out_str, int resplen); static void _task(void *ptr) { #if MQTT_THREAD_ARG_FLAG /* connect to m2m.eclipse.org, subscribe to a topic, send and receive messages regularly every 1 sec */ MQTTClient mqtt_client; #endif Network network; unsigned char sendBuf[128],readBuf[128]; int rc = 0,count = 0; MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer; // MQTT_SLEEP(30); char at_str[32]={'\0'}; MQTT_SLEEP(1); sprintf(at_str, "AT^SYSINFO\r"); int ret; char resp_str[64]={'\0'}; int ready=0; am_printf("begin run mqtt echo task,client(%p)",&mqtt_client); ret = SendATCMDWaitResp(TEL_AT_CMD_ATP_10, at_str, 3, "^SYSINFO:",1,NULL, resp_str, sizeof(resp_str)); printf("%s: resp_str = %s, ret = %u\n",__FUNCTION__,resp_str,ret); if(strstr(resp_str, "^SYSINFO: 2,3") != NULL || strstr(resp_str, "^SYSINFO: 2,2") != NULL){ ready = 1; } while (!ready){ sleep(1); ret = SendATCMDWaitResp(TEL_AT_CMD_ATP_10, at_str, 3, "^SYSINFO:",1,NULL, resp_str, sizeof(resp_str)); printf("%s: resp_str = %s, ret = %u\n",__FUNCTION__,resp_str,ret); printf("len = %u, resp_str[10] = %c, resp_str[12] = %c, resp_str[16] = %c\n",strlen(resp_str),resp_str[10],resp_str[12],resp_str[16]); if(strstr(resp_str, "^SYSINFO: 2,3") != NULL || strstr(resp_str, "^SYSINFO: 2,2") != NULL){ ready = 1; } } printf("\n\n\nSuccess in the net \n\n\n"); ptr = 0; NetworkInit(&network); MQTTClientInit(&mqtt_client,&network,30000,sendBuf,sizeof(sendBuf),readBuf,sizeof(readBuf)); char* address = MQTT_IP; if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) { am_printf("Return code from network connect is %d\n", rc); } else { am_printf("NetworkConnect Connected\n"); } #if defined(MQTT_TASK) if((rc == MQTTStartTask(&mqtt_client)) != MQTT_ERR_SUCCESS) { am_printf("Return code from start tasks is %d\n",rc); } else { am_printf("MQTTStartTask success\n"); } #endif /** * Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 */ connectData.MQTTVersion = 4; connectData.clientID.cstring = MQTTCLIENT_ID; connectData.username.cstring = MQTTUSERNAME; connectData.password.cstring = MQTTPASSWORD; if((rc = MQTTConnect(&mqtt_client,&connectData)) != 0) { am_printf("Return code from Mqtt connect is %d\n",rc); } else { am_printf("Mqtt connected\n"); } if((rc = MQTTSubscribe(&mqtt_client,MQTT_SUB_TOPIC,2,messageArrived)) != 0) { am_printf("Return code from Mqtt subcribe is %d\n",rc); } while (++count) { am_printf("do while in echo task.."); MQTTMessage message; char payload[30]; message.qos = 1; message.retained = 0; message.payload = payload; sprintf(payload, "message number %d", count); message.payloadlen = strlen(payload); if ((rc = MQTTPublish(&mqtt_client, MQTT_PUB_TOPIC, &message)) != 0) am_printf("Return code from MQTT publish is %d\n", rc); #if !defined(MQTT_TASK) if((rc = MQTTYield(&mqtt_client,1000)) != 0) am_printf("Return code from yield is %d\n",rc); #endif MQTT_SLEEP(5); } } ```