UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP - IOCP下的分包及重组)
Packet.h
- #include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG
- #include
- using
namespace
std;
- //先定义包头
- typedef
struct
_HeadExt
- {
- //每帧所有分片公用信息
- char
flag[5];
//可以任意字符或数字或其他方法,用来标志我们发送的数据
- UINT
m_nSeqNumber;
//本帧序数
- USHORT
m_nTotalFragment;
//本帧数据可分成的总片数
- USHORT
m_nTotalSize;
//本帧数据总长度
- //每片各自信息
- USHORT
m_nFragmentIndex;
//每帧分片序数,0 1 2 ... ,/
- USHORT
m_usPayloadSize;
//本片数据长度
- USHORT
m_usFragOffset;
//本片数据相对总数据的偏移量
- USHORT
m_bLastFragment;
//是否最后一帧
- //上 述数据有些重复,可以精简,有时间再修改
- }HeadExt;
- //从socket接收到的数据
- class
PacketIn
- {
- public
:
- PacketIn(char
* lpszBuffer=NULL,
UINT
usBufferSize=0,
UINT
nDataSize=0);
- virtual
~PacketIn();
- HeadExt head;
- BYTE
* m_lpszBuffer;
- ULONG
ip;
- UINT
port;
- CVLInetAddr addr;
- BOOL
Normalize();
- UINT
m_nDataLen;
- UINT
m_usBufferSize;
- };
- //接收到数据后开始重组使用的一个中间缓冲区,多个PacketIn合成一个Packet
- //发送时从一个Packet分割出多片,不过本程序里直接发送,没有封成Packet
- class
Packet
- {
- public
:
- enum
{ TIMEOUT_LOCKFRAGMENTS = 1000 };
- Packet( );
- ~Packet();
- void
reset();
- void
Set(
const
CVLInetAddr& iaFrom,
UINT
nSeqNumber );
- BOOL
InsertFragment(PacketIn*
const
pFragment);
- bool
m_bUsed;
- int
recvedpacks;
- int
recvedbytes;
- int
seqnum;
- BYTE
* m_pBuffer;
- //测试用程序,直接访问了类的变量,没有封装成函数。上述变量正常应该写成 SetXXX ,GetXXX
- private
:
- BOOL
IsFragComplete()
const
;
- ULONG
m_ip;
- USHORT
m_port;
- UINT
m_nSeqNumber;
- vector<int
> SeqNumberList;
- };
Packet.cpp
- #include "Packet.h"
- PacketIn::PacketIn(char
* lpszBuffer
/*=NULL*/
,
UINT
usBufferSize
/*=0*/
,
UINT
nDataSize
/*=0*/
)
- {
- m_lpszBuffer = (BYTE
*)lpszBuffer;
- m_usBufferSize = usBufferSize; //数据最大长度,其实没什么用,已经设置了这个值最大为64000
- m_nDataLen = nDataSize;
- }
- PacketIn::~PacketIn()
- {
- }
- BOOL
PacketIn::Normalize()
//判断收到的数据是否有效
- {
- const
USHORT
usHeaderSize =
sizeof
(HeadExt);
- if
( !m_lpszBuffer || m_usBufferSize < usHeaderSize )
//没什么用
- {
- return
FALSE;
- }
- HeadExt* pHeader = (HeadExt*)( m_lpszBuffer );
- if
( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize )
- {
- return
FALSE;
- }
- head = *pHeader;
- if
( pHeader->m_usPayloadSize > 0 )
- {
- memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize );
- }
- return
TRUE;
- }
- /////////////
- //这里是重组数据的临时缓冲,只看这里必然迷惑,先看socket收数据那里,再回来查看
- void
Packet::Set(
const
CVLInetAddr& iaFrom,
UINT
nSeqNumber )
- {
- m_iaFrom = iaFrom;
- m_nSeqNumber = nSeqNumber;
- if
(m_pBuffer)
- delete
[]m_pBuffer;
- }
- void
Packet::reset()
- {
- m_bUsed = false
;
- recvedpacks = 0;
- seqnum = 0;
- recvedbytes = 0;
- m_pBuffer = NULL;
- m_pBuffer = new
BYTE
[64000];
- SeqNumberList.clear();
- }
- Packet::Packet()
- {
- //calculate the ttl
- //m_nTTL = RUDP_REASSEMBLE_TIMEOUT*CRudpPeer::PR_SLOWHZ;
- reset();
- }
- Packet::~Packet()
- {
- if
(m_pBuffer)
- delete
[]m_pBuffer;
- }
- BOOL
Packet::InsertFragment(PacketIn*
const
pFragment)
- {
- int
nSize = SeqNumberList.size();
- for
(
int
i = 0; i< nSize ;i ++)
- {
- if
(nSize ==SeqNumberList[i] )
//收到重复数据包
- {
- return
FALSE;
- }
- }
- SeqNumberList.push_back(pFragment->head.m_nFragmentIndex);
- memcpy( m_pBuffer + pFragment->head.m_usFragOffset,
- pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize);
- recvedbytes += pFragment->head.m_usPayloadSize;
- recvedpacks++;
- m_bUsed = true
;
- CString str;
- str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d/r/n",
- pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment,
- pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset,
- pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize);
- OutputDebugString(str);
- if
(recvedbytes == pFragment->head.m_nTotalSize )
- return
TRUE;
- return
FALSE;
- }
发送时的操作:
- #iclude "packet.h"
- const
int
RTP_SPLIT_PACKSIZE = 1300;
- //udp一般最大包为1500,一般这里都设置为1400,当然1300也没错
- LONGLONG
index = rand();
- //帧序数,从一个随机数开始,我机器上生成的是41
- //分包发送,逻辑相当简单,按最常规的方法分割发送
- void
Send(
BYTE
* pBuff,
UINT
nLen,
ULONG
ip,
USHORT
port)
- {
- HeadExt head;
- strcpy(head.flag ,"aaaa"
);
- head.m_nSeqNumber = index++; //帧序数
- head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE;
- head.m_nTotalFragment += nLen%RTP_SPLIT_PACKSIZE?1:0;
- //整除的话就是0,否则多出一个尾数据
- head.m_nFragmentIndex = 0;//第一个分段从0开始
- head.m_nTotalSize = nLen;
- head.m_bLastFragment = 0;
- head.m_usFragOffset = 0;
- char
tem[RTP_SPLIT_PACKSIZE+
sizeof
(HeadExt)];
- if
(head.m_nTotalFragment == 1)
- {
- memcpy(tem,&head,sizeof
(HeadExt));
- memcpy(tem+sizeof
(HeadExt),pBuff,nLen);
- head.m_usPayloadSize = nLen;
- //用UDP发送,常规做法,先对UDP做一个封装的类,这里调用类对象。本代码只说明原理,类似伪代友
- sendto(tem,nLen+sizeof
(HeadExt),ip,port);
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n"
,
- head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
- head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);
- OutputDebugString(str);
- return
;
- }
- int
i = 0;
- for
( i = 0; i < head.m_nTotalFragment-1; i++)
//开始分割,最后一个单独处理
- {
- head.m_bLastFragment = 0;
- head.m_nFragmentIndex = i;
- head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;
- head.m_usPayloadSize = RTP_SPLIT_PACKSIZE;
- memcpy(tem,&head,sizeof
(HeadExt));
- memcpy(tem+sizeof
(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE);
- sendto(tem,nLen+sizeof
(HeadExt),ip,port);
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n"
,
- head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
- head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);
- OutputDebugString(str);
- }
- head.m_bLastFragment = 1;
- head.m_nFragmentIndex = i;
- head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;
- head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE);
- memcpy(tem,&head,sizeof
(HeadExt));
- memcpy(tem+sizeof
(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE));
- sendto(tem,nLen+sizeof
(HeadExt),ip,port);
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n"
,head.m_nSeqNumber,
- head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset,
- head.m_nTotalSize,head.m_usPayloadSize);
- OutputDebugString(str);
- }
接收数据的回调
//代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去
//因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。
- void
RecvCallback(
BYTE
* pBuff,
UINT
nLen,
ULONG
ip,
USHORT
port)
- {
- if
(nLen <
sizeof
(HeadExt))
- return
;
HeadExt* pHead = (HeadExt*)pBuff;
- CString str = pHead->flag;
- if
(str !=
"aaaa"
)
- {
- return
;
- }
- if
(pHead->m_nTotalFragment == 1)
//只有一帧,不分片
- {
- //回调,上层处理
- if
(m_pfDispatch)
- m_pfDispatch(pBuff+sizeof
(HeadExt),nLen-
sizeof
(HeadExt),ip,port,m_lpParam);
- return
;
- }
- PacketIn data( (char
*)pBuff, 64000, nLen );
- if
( !data.Normalize() )
- return
;
- if
( data.head.m_nTotalFragment>1 )
- {
- Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber );
- if
( pTemp )
- {
- if
( pTemp ->InsertFragment( &data ) )
- {
- m_pfDispatch(pTemp ->m_pBuffer,
- pTemp ->recvedbytes,ip,port,m_lpParam);
- }
- }//end of if
- }
- }
//上面用到的一些变量可以在一个.h里面定义,比如:
Packet TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲
Packet* GetPartialPacket(const CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点
- //根据这几个关键字查找,不过只用
- //到了nSeqNumber,要是3人以上的视频聊天,ip和port是必要的
- Packet* GetPartialPacket(const
ULONG
ip,
USHORT
port,
UINT
nSeqNumber)
- {
- Packet* tmp = NULL;
- int
i=0;
- while
(tmp==NULL && i<16)
- {
- //该包所属的帧已有其它数据包到达
- if
(TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0)
- {
- tmp = &TempPacket[i];
- break
;
- }
- i++;
- }
- if
(tmp == NULL)
//新的帧
- {
- //查找空闲元素
- for
(i=0;i<16;i++)
- {
- if
(!TempPacket[i].m_bUsed)
- break
;
- }
- if
(i>=16)
- {
- //没有空闲的元素,丢掉一个最早
- tmp = &TempPacket[0];
- int
j = 0;
- for
(i=1;i<16;i++)
- {
- if
(TempPacket[i].seqnum < tmp->seqnum)
- {
- tmp = &TempPacket[i];
- j = i;
- }
- }
- //找到最早的一帧
- if
(tmp->m_pBuffer)
- {
- delete
[]tmp->m_pBuffer;
- tmp->reset();
- }
- }
- else
- tmp = &TempPacket[i];
- }
- InsertFragment
- tmp->m_bUsed = true
;
- tmp->seqnum = nSeqNumber;
- return
tmp;
- }
整个示例最重要的是两个函数:
GetPartialPacket:取到一个临时缓冲节点
InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧
当然发送方也要按一定规则分割发送。