最近有个项目要添加MQTT协议,根据协议,我们是MQTT客户端,底层已经实现了TCP/IP协议,
载荷是我们私有协议,现在要把载荷换成MQTT协议和用户定义的JSON包。看了下MQTT协议和
C库,如果想在两天之内实现,难度有点大,没有用过MQTT,不知道JSON是啥?通过对用户协
的解读和对MQTT协议查阅资料,目前想要快速实现,具体思路如下:
1、抓包分析MQTT数据包;
2、查看MQTT中文协议;
3、只实现登入、发布消息和心跳这三种类型包;
4、JSON包用sprintf,只实现用户定义的JSON包;
根据以上思路,需要工具:
1、Wireshark;
2、通信猫;
一、登入帧
固定包头:报文类型+剩余长度 10+剩余长度
可变包头:协议名(Protocol Name),协议级别
(Protocol Level),连接标志(Connect Flags)和保持连接(Keep Alive)
共10个字节
协议名称:00 04 4d 51 54 54 //协议名称
协议版本:04 //协议版本 3.1.1
连接标志:c2 //连接标志
保持连接:00 14 //保持连接,S为单位
const static uint8_t c_VariableHeader[] = {0x00,0x04,0x4d,0x51,0x54,0x54,0x04,0xC2,0x00,0x14};
载荷:客户端标识(必须唯一)+登入名(可以为空)+登入密码(可以为空)
两字节客户端标识长度+客户标识
两字节登入名长度+登入名
两字节登入密码长度+登入密码
//MQTT连接连接包
bool mqtt_connect(uint8_t* pchBuffer,uint16_t* phwSize)
{uint8_t chVariableHeader[sizeof(c_VariableHeader)];uint8_t chTemp[SERVICE_BUFFER_SIZE] = {0};byte_pipe_t tPipe;union{uint8_t chTemp[2];int16_t iTemp;}tTemp = {0};if(NULL == pchBuffer || NULL == phwSize){return false;}new_byte_pipe(&tPipe,chTemp,sizeof(chTemp));memcpy(chVariableHeader,c_VariableHeader,sizeof(c_VariableHeader));//设备ID{uint8_t chBuffer[20] = {0};memset(chBuffer,0x00,20);tTemp.iTemp = sprintf((char*)chBuffer,"Dome%d",eeprom_id_read());if(tTemp.iTemp > 0){enpipe_byte(&tPipe,tTemp.chTemp[1]);enpipe_byte(&tPipe,tTemp.chTemp[0]);enpipe_bytes(&tPipe,chBuffer,tTemp.iTemp);}}//用户名tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_name_read());if(tTemp.iTemp > 0){enpipe_byte(&tPipe,tTemp.chTemp[1]);enpipe_byte(&tPipe,tTemp.chTemp[0]);enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_name_read(),tTemp.iTemp);}else{chVariableHeader[7] = chVariableHeader[7] & 0x7F;}//登入密码tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_password_read());if(tTemp.iTemp > 0){enpipe_byte(&tPipe,tTemp.chTemp[1]);enpipe_byte(&tPipe,tTemp.chTemp[0]);enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_password_read(),tTemp.iTemp);}else{chVariableHeader[7] = chVariableHeader[7] & 0xBF;}//Keep Alive时间tTemp.iTemp = eeprom_heart_beat_min_read() * 60;chVariableHeader[8] = tTemp.chTemp[1];chVariableHeader[9] = tTemp.chTemp[0];//固定包头pchBuffer[0] = 0x10;if(get_byte_pipe_num(&tPipe)+10 > (sizeof(chTemp) - 3)){return false;}//不支持大于256的数据if(get_byte_pipe_num(&tPipe)+10 > 127){pchBuffer[1] = 0x00;pchBuffer[2] = 0x00;//低字节在前,高字节在后pchBuffer[1] = (get_byte_pipe_num(&tPipe)+10) & 0x007F;pchBuffer[1] = pchBuffer[1] | 0x80;pchBuffer[2] = (get_byte_pipe_num(&tPipe)+10) >> 7;//可变帧头memcpy(pchBuffer+3,chVariableHeader,10);//Payloadmemcpy(pchBuffer+3+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));*phwSize = 3+10+get_byte_pipe_num(&tPipe);}else{pchBuffer[1] = get_byte_pipe_num(&tPipe)+10;//可变帧头memcpy(pchBuffer+2,chVariableHeader,10);//Payloadmemcpy(pchBuffer+2+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));*phwSize = 2+10+get_byte_pipe_num(&tPipe);}return true;
}
二、发布消息
发布消息分为QoS0、QoS1、QoS2,其中QoS0没有回复帧,QoS1需要确认帧,QoS2实现
比较复杂,暂不实现。
QoS0不包含报文标识
30+剩余长度+主题长度+主题+用户载荷
//MQTT发送消息包
//没有确认帧
bool mqtt_publish_qos0(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{uint16_t j=0;static union{uint8_t chFrameNum[2];uint16_t hwFrameNum;}s_tFrameNum = {0};union{uint8_t chTemp[2];int16_t iTemp;}tTemp = {0};if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){return false;}tTemp.iTemp = hwJsonSize;j = strlen(pcPublishTopic);if(tTemp.iTemp+j+5 > hwBufferSize){TRACE_ERROR("mqtt_publish_qos0 size error\r\n");return false;}//固定包头pchBuffer[0] = 0x30;//剩余长度//不支持大于256的数据if(tTemp.iTemp+j+2 > 127){pchBuffer[1] = 0x00;pchBuffer[2] = 0x00;//低字节在前,高字节在后pchBuffer[1] = (tTemp.iTemp+j+2) & 0x007F;pchBuffer[1] = pchBuffer[1] | 0x80;pchBuffer[2] = (tTemp.iTemp+j+2) >> 7;//主题长度pchBuffer[3] = (j >> 8) & 0x00FF;pchBuffer[4] = (j >> 0) & 0x00FF;//主题memcpy(pchBuffer+5,pcPublishTopic,j);//报文标识//pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];//pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0]; //载荷memcpy(pchBuffer+j+5,pchJson,tTemp.iTemp);*phwSize = tTemp.iTemp+j+5;}else{pchBuffer[1] = tTemp.iTemp+j+2;//主题长度pchBuffer[2] = (j >> 8) & 0x00FF;pchBuffer[3] = (j >> 0) & 0x00FF;//主题memcpy(pchBuffer+4,pcPublishTopic,j);//报文标识//pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];//pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0]; //载荷memcpy(pchBuffer+j+4,pchJson,tTemp.iTemp);*phwSize = tTemp.iTemp+j+4;}s_tFrameNum.hwFrameNum++;return true;
}
QoS1需要服务器发送确认帧
32+剩余长度+主题长度+主题+报文标识+用户载荷
//MQTT发送消息包
//有确认帧
bool mqtt_publish_qos1(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{uint16_t j=0;static union{uint8_t chFrameNum[2];uint16_t hwFrameNum;}s_tFrameNum = {0};union{uint8_t chTemp[2];int16_t iTemp;}tTemp = {0};if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){return false;}tTemp.iTemp = hwJsonSize;j = strlen(pcPublishTopic);if(tTemp.iTemp+j+5+2 > hwBufferSize){TRACE_ERROR("mqtt_publish_qos1 size error\r\n");return false;}//固定包头pchBuffer[0] = 0x32;//剩余长度if(tTemp.iTemp+j+4 > 127){pchBuffer[1] = 0x00;pchBuffer[2] = 0x00;//低字节在前,高字节在后pchBuffer[1] = (tTemp.iTemp+j+4) & 0x007F;pchBuffer[1] = pchBuffer[1] | 0x80;pchBuffer[2] = (tTemp.iTemp+j+4) >> 7;//主题长度pchBuffer[3] = (j >> 8) & 0x00FF;pchBuffer[4] = (j >> 0) & 0x00FF;//主题memcpy(pchBuffer+5,pcPublishTopic,j);//报文标识pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0]; //载荷memcpy(pchBuffer+j+5+2,pchJson,tTemp.iTemp);*phwSize = tTemp.iTemp+j+5+2;}else{pchBuffer[1] = tTemp.iTemp+j+4;//主题长度pchBuffer[2] = (j >> 8) & 0x00FF;pchBuffer[3] = (j >> 0) & 0x00FF;//主题memcpy(pchBuffer+4,pcPublishTopic,j);//报文标识pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0]; //载荷memcpy(pchBuffer+j+4+2,pchJson,tTemp.iTemp);*phwSize = tTemp.iTemp+j+4+2;}s_tFrameNum.hwFrameNum++;return true;
}
三、心跳
发送 C0 00
收到 D0 00
//MQTT心跳包
bool mqtt_pingresp(uint8_t* pchBuffer,uint16_t* phwSize)
{if(NULL == pchBuffer || NULL == phwSize){return false;}pchBuffer[0] = 0xC0;pchBuffer[1] = 0x00;*phwSize = 2;return true;
}
四、JSON包
JSON包就是一串用户定义的字符串,用sprintf实现即可。
举例:定位
uint16_t mqtt_location_publish(uint32_t wID,char* pcLocation,uint8_t chSize,char* pchBuffer)
{int16_t i=0;if(NULL == pcLocation || !chSize || NULL == pchBuffer){return 0;}i = sprintf(pchBuffer,"{\"nodeIp\":\"%u\",\"baseIp\":\"%u\",\"location\":\"",wID,eeprom_id_read());i = (i>0)?i:0;if(!i){return 0;}memcpy(pchBuffer+i,pcLocation,chSize);i+=chSize;pchBuffer[i] = '"';i+=1;pchBuffer[i] = '}';i+=1;return i;
}
根据以上思路,在TCP/IP之上实现MQTT的客户端,两天就实现了。
《--------------------------------------------------------------------------------------------------------------------------》
JSON(Javascript Object Notation)是一种轻量级的数据交换格式.易于阅读和理解,也易于机器解析和生成.JSON采用独立于语言的文本格式,使用了类似于C语言家族的习惯(包括C,C++,C#,Java, Javascript, Perl, Python等).这些特性使得JSON成为理想的数据交换语言.
一 JSON构建于两种结构:
二 JSON的形式
对象是一个无序的"'名称/值'对"集合.一个对象以“{”(左括号)开始,“}”(右括号)结束。每个“名称”后跟一个“:”(冒号);“‘名称/值’ 对”之间使用“,”(逗号)分隔。
2. 数组
数组是值(value)的有序集合。一个数组以“[”(左中括号)开始,“]”(右中括号)结束。值之间间使用“,”(逗号)分隔。
三 值的内容
值(value)可以是双引号括起来的字符串(string)、数值(number)、true、false、 null、对象(object)或者数组(array)。这些结构可以嵌套。
字符串(string)是由双引号包围的任意数量Unicode字符的集合,使用反斜线转义。一个字符(character)即一个单独的字符串(character string)。
字符串(string)与C或者Java的字符串非常相似。
数值(number)也与C或者Java的数值非常相似。除去未曾使用的八进制与十六进制格式。除去一些编码细节。
四 范例
注意:字符串一定要用双引号括起来
数组中可以嵌套数组和对象
{"name": "BeJson","url": "http://www.bejson.com","page": 88,"isNonProfit": true,"address": {"street": "科技园路.","city": "江苏苏州","country": "中国"},"links": [{"name": "Google","url": "http://www.google.com"},{"name": "Baidu","url": "http://www.baidu.com"},{"name": "SoSo","url": "http://www.SoSo.com"}]}