MQTT Demo
```language
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <time.h>
#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include <stdint.h>
#include "teldef.h"
#include "osa.h"
#include "UART.h"
#include "sys.h"
#include "sdk_api.h"
/*
#include "fbf_parse.h"
#include "osa.h"
#include "atfota.h"
#include "cmux_api.h"
#include "mi_api.h"
#include "download.h"
#include "I2C_PMIC_Config.h"
#include "libhttpclient.h"
*/
#include "MQTTClient.h"
#undef am_printf
#define am_printf(fmt, args...) fatal_printf("MqttTest: "fmt, ##args)
#define sleep(x) OSATaskSleep((x) * 200)//second
#define _TASK_STACK_SIZE 1024*10
static UINT32 _task_stack[_TASK_STACK_SIZE/sizeof(UINT32)];
static OSTaskRef _task_ref = NULL;
static OSATimerRef _timer_ref = NULL;
static OSFlagRef _flag_ref = NULL;
#define TASK_TIMER_CHANGE_FLAG_BIT 0x01
static void _timer_callback(UINT32 tmrId);
static void _task(void *ptr);
char MQTT_IP[] = "183.230.40.39"; /* onenet MQTT IP地址 */
int MQTT_PORT = 6002; /* onenet MQTT 端口号 */
char MQTTCLIENT_ID[] = "600324264"; /* onenet 设备ID */
char MQTTUSERNAME[] = "324875"; /* onenet 产品ID 600324264*/
char MQTTPASSWORD[] = "hT=wtearqIsmztZ=S5CN4ttqOig="; /* onenet 设备鉴权 APIKey 信息 */
char MQTT_SUB_TOPIC[] = "mqtt/sample/#";
char MQTT_PUB_TOPIC[] = "mqtt/sample/mifi";
#if !MQTT_THREAD_ARG_FLAG
/*for create thread, arg input is abnormal, we use global var*/
MQTTClient mqtt_client;
#endif
// Device bootup hook before Phase1Inits.
// If you have some work to be init, you may implete it here.
// ex: you may start your task here. or do some initialize here.
extern void Phase1Inits_enter(void);
// Device bootup hook after Phase1Inits.
// If you have some work to be init, you may implete it here.
// ex: you may start your task here. or do some initialize here.
extern void Phase1Inits_exit(void);
// Device bootup hook before Phase2Inits.
// If you have some work to be init, you may implete it here.
// ex: you may start your task here. or do some initialize here.
extern void Phase2Inits_enter(void);
// Device bootup hook after Phase2Inits.
// If you have some work to be init, you may implete it here.
// ex: you may start your task here. or do some initialize here.
extern void Phase2Inits_exit(void);
void Phase1Inits_enter(void)
{
}
void Phase1Inits_exit(void)
{
}
void Phase2Inits_enter(void)
{
}
void Phase2Inits_exit(void)
{
int ret;
ret = OSAFlagCreate(&_flag_ref);
ASSERT(ret == OS_SUCCESS);
ret = OSATimerCreate(&_timer_ref);
ASSERT(ret == OS_SUCCESS);
ret = OSATaskCreate(&_task_ref, _task_stack, _TASK_STACK_SIZE, 150, "test-task", _task, NULL);
ASSERT(ret == OS_SUCCESS);
OSATimerStart(_timer_ref, 20 * 200, 20 * 200, _timer_callback, 0); // 10 seconds timer
}
void messageArrived(MessageData* data)
{
am_printf("Recv by topic:%.*s message:%.*s \n",data->topicName->lenstring.len,data->topicName->lenstring.data,data->message->payloadlen,data->message->payload);
}
static void _timer_callback(UINT32 tmrId)
{
OSAFlagSet(_flag_ref, TASK_TIMER_CHANGE_FLAG_BIT, OSA_FLAG_OR);
}
extern int SendATCMDWaitResp(int sATPInd,char *in_str, int timeout, char *ok_fmt, int ok_flag,
char *err_fmt, char *out_str, int resplen);
static void _task(void *ptr)
{
#if MQTT_THREAD_ARG_FLAG
/* connect to m2m.eclipse.org, subscribe to a topic, send and receive messages regularly every 1 sec */
MQTTClient mqtt_client;
#endif
Network network;
unsigned char sendBuf[128],readBuf[128];
int rc = 0,count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
// MQTT_SLEEP(30);
char at_str[32]={'\0'};
MQTT_SLEEP(1);
sprintf(at_str, "AT^SYSINFO\r");
int ret;
char resp_str[64]={'\0'};
int ready=0;
am_printf("begin run mqtt echo task,client(%p)",&mqtt_client);
ret = SendATCMDWaitResp(TEL_AT_CMD_ATP_10, at_str, 3, "^SYSINFO:",1,NULL, resp_str, sizeof(resp_str));
printf("%s: resp_str = %s, ret = %u\n",__FUNCTION__,resp_str,ret);
if(strstr(resp_str, "^SYSINFO: 2,3") != NULL || strstr(resp_str, "^SYSINFO: 2,2") != NULL){
ready = 1;
}
while (!ready){
sleep(1);
ret = SendATCMDWaitResp(TEL_AT_CMD_ATP_10, at_str, 3, "^SYSINFO:",1,NULL, resp_str, sizeof(resp_str));
printf("%s: resp_str = %s, ret = %u\n",__FUNCTION__,resp_str,ret);
printf("len = %u, resp_str[10] = %c, resp_str[12] = %c, resp_str[16] = %c\n",strlen(resp_str),resp_str[10],resp_str[12],resp_str[16]);
if(strstr(resp_str, "^SYSINFO: 2,3") != NULL || strstr(resp_str, "^SYSINFO: 2,2") != NULL){
ready = 1;
}
}
printf("\n\n\nSuccess in the net \n\n\n");
ptr = 0;
NetworkInit(&network);
MQTTClientInit(&mqtt_client,&network,30000,sendBuf,sizeof(sendBuf),readBuf,sizeof(readBuf));
char* address = MQTT_IP;
if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0)
{
am_printf("Return code from network connect is %d\n", rc);
}
else
{
am_printf("NetworkConnect Connected\n");
}
#if defined(MQTT_TASK)
if((rc == MQTTStartTask(&mqtt_client)) != MQTT_ERR_SUCCESS)
{
am_printf("Return code from start tasks is %d\n",rc);
}
else
{
am_printf("MQTTStartTask success\n");
}
#endif
/**
* Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
*/
connectData.MQTTVersion = 4;
connectData.clientID.cstring = MQTTCLIENT_ID;
connectData.username.cstring = MQTTUSERNAME;
connectData.password.cstring = MQTTPASSWORD;
if((rc = MQTTConnect(&mqtt_client,&connectData)) != 0)
{
am_printf("Return code from Mqtt connect is %d\n",rc);
}
else
{
am_printf("Mqtt connected\n");
}
if((rc = MQTTSubscribe(&mqtt_client,MQTT_SUB_TOPIC,2,messageArrived)) != 0)
{
am_printf("Return code from Mqtt subcribe is %d\n",rc);
}
while (++count)
{
am_printf("do while in echo task..");
MQTTMessage message;
char payload[30];
message.qos = 1;
message.retained = 0;
message.payload = payload;
sprintf(payload, "message number %d", count);
message.payloadlen = strlen(payload);
if ((rc = MQTTPublish(&mqtt_client, MQTT_PUB_TOPIC, &message)) != 0)
am_printf("Return code from MQTT publish is %d\n", rc);
#if !defined(MQTT_TASK)
if((rc = MQTTYield(&mqtt_client,1000)) != 0)
am_printf("Return code from yield is %d\n",rc);
#endif
MQTT_SLEEP(5);
}
}
```