为什么需要有应用层缓冲区?
muduo网络库使用IO复用,并且文件描述符使用非阻塞模式,如果使用阻塞模式那么read、write就会阻塞在这些系统调用之上,这样一来即使其他文件描述符的IO到来也不能立刻去处理,也就不能最大限度的使用IO线程。
考虑一个常见场景:程序想通过 TCP 连接 发送 100k 字节的数据,但是在 write() 调用中,操作系统只接受了 80k 字节(受 TCP advertised window 的控制,细节见 TCPv1),你肯定不想在原地等待,因为不知道 会等多久(取决于对方什么时候接受数据,然后滑动 TCP 窗口)。程序应该尽快交出 控制权,返回 event loop。在这种情况下,剩余的 20k 字节数据怎么办? 所以需要一个应用层缓冲区。
接收到数据,存至input buffer,通知上层的应用程序,OnMessage(Buffer * buff) 回调,根据应用层协议判定是否是一个完整的包,codec,如果不是一条完整的消息,不会取走数据,也不会进行相应的处理,如果是一条完整的消息,将取走这条消息,并进行相应的处理。 怎么判断一条完整的消息是应用层的逻辑.
Buffer 的要求
Muduo Buffer 的设计考虑了常见的网络编程需求,我试图在易用性和性能之间
找一个平衡点,目前这个平衡点更偏向于易用性。 Muduo Buffer 的设计要点:
-
对外表现为一块连续的内存 (char*, len),以方便客户代码的编写。
-
其 size() 可以自动增长,以适应不同大小的消息。它不是一个 fixed size array
(即 char buf[8192])。
-
内部以 vector of char 来保存数据,并提供相应的访问函数。
Buffer 其实像是一个 queue,从末尾写入数据,从头部读出数据。
#ifndef MUDUO_NET_BUFFER_H
#define MUDUO_NET_BUFFER_H#include
#include
#include #include #include
#include #include
#include
//#include // ssize_tnamespace muduo
{
namespace net
{/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <&#61; readerIndex <&#61; writerIndex <&#61; size
/// &#64;endcode
class Buffer : public muduo::copyable
{public:static const size_t kCheapPrepend &#61; 8;static const size_t kInitialSize &#61; 1024;Buffer(): buffer_(kCheapPrepend &#43; kInitialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){assert(readableBytes() &#61;&#61; 0);assert(writableBytes() &#61;&#61; kInitialSize);assert(prependableBytes() &#61;&#61; kCheapPrepend);}// default copy-ctor, dtor and assignment are finevoid swap(Buffer& rhs){buffer_.swap(rhs.buffer_);std::swap(readerIndex_, rhs.readerIndex_);std::swap(writerIndex_, rhs.writerIndex_);}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() &#43; readerIndex_; }const char* findCRLF() const{const char* crlf &#61; std::search(peek(), beginWrite(), kCRLF, kCRLF&#43;2);return crlf &#61;&#61; beginWrite() ? NULL : crlf;}const char* findCRLF(const char* start) const{assert(peek() <&#61; start);assert(start <&#61; beginWrite());const char* crlf &#61; std::search(start, beginWrite(), kCRLF, kCRLF&#43;2);return crlf &#61;&#61; beginWrite() ? NULL : crlf;}// retrieve returns void, to prevent// string str(retrieve(readableBytes()), readableBytes());// the evaluation of two functions are unspecifiedvoid retrieve(size_t len){assert(len <&#61; readableBytes());if (len (readableBytes()));}void append(const StringPiece& str){append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data&#43;len, beginWrite());hasWritten(len);}void append(const void* /*restrict*/ data, size_t len){append(static_cast(data), len);}// 确保缓冲区可写空间>&#61;len&#xff0c;如果不足则扩充void ensureWritableBytes(size_t len){if (writableBytes() &#61; len);}char* beginWrite(){ return begin() &#43; writerIndex_; }const char* beginWrite() const{ return begin() &#43; writerIndex_; }void hasWritten(size_t len){ writerIndex_ &#43;&#61; len; }////// Append int32_t using network endian///void appendInt32(int32_t x){int32_t be32 &#61; sockets::hostToNetwork32(x);append(&be32, sizeof be32);}void appendInt16(int16_t x){int16_t be16 &#61; sockets::hostToNetwork16(x);append(&be16, sizeof be16);}void appendInt8(int8_t x){append(&x, sizeof x);}////// Read int32_t from network endian////// Require: buf->readableBytes() >&#61; sizeof(int32_t)int32_t readInt32(){int32_t result &#61; peekInt32();retrieveInt32();return result;}int16_t readInt16(){int16_t result &#61; peekInt16();retrieveInt16();return result;}int8_t readInt8(){int8_t result &#61; peekInt8();retrieveInt8();return result;}////// Peek int32_t from network endian////// Require: buf->readableBytes() >&#61; sizeof(int32_t)int32_t peekInt32() const{assert(readableBytes() >&#61; sizeof(int32_t));int32_t be32 &#61; 0;::memcpy(&be32, peek(), sizeof be32);return sockets::networkToHost32(be32);}int16_t peekInt16() const{assert(readableBytes() >&#61; sizeof(int16_t));int16_t be16 &#61; 0;::memcpy(&be16, peek(), sizeof be16);return sockets::networkToHost16(be16);}int8_t peekInt8() const{assert(readableBytes() >&#61; sizeof(int8_t));int8_t x &#61; *peek();return x;}////// Prepend int32_t using network endian///void prependInt32(int32_t x){int32_t be32 &#61; sockets::hostToNetwork32(x);prepend(&be32, sizeof be32);}void prependInt16(int16_t x){int16_t be16 &#61; sockets::hostToNetwork16(x);prepend(&be16, sizeof be16);}void prependInt8(int8_t x){prepend(&x, sizeof x);}void prepend(const void* /*restrict*/ data, size_t len){assert(len <&#61; prependableBytes());readerIndex_ -&#61; len;const char* d &#61; static_cast(data);std::copy(d, d&#43;len, begin()&#43;readerIndex_);}// 收缩&#xff0c;保留reserve个字节void shrink(size_t reserve){// FIXME: use vector::shrink_to_fit() in C&#43;&#43; 11 if possible.Buffer other;other.ensureWritableBytes(readableBytes()&#43;reserve);other.append(toStringPiece());swap(other);}/// Read data directly into buffer.////// It may implement with readv(2)/// &#64;return result of read(2), &#64;c errno is savedssize_t readFd(int fd, int* savedErrno);private:char* begin(){ return &*buffer_.begin(); }const char* begin() const{ return &*buffer_.begin(); }void makeSpace(size_t len){if (writableBytes() &#43; prependableBytes() buffer_; // vector用于替代固定大小数组size_t readerIndex_; // 读位置size_t writerIndex_; // 写位置static const char kCRLF[]; // "\r\n"
};}
}#endif // MUDUO_NET_BUFFER_H
#include #include #include
#include using namespace muduo;
using namespace muduo::net;const char Buffer::kCRLF[] &#61; "\r\n";const size_t Buffer::kCheapPrepend;
const size_t Buffer::kInitialSize;// 结合栈上的空间&#xff0c;避免内存使用过大&#xff0c;提高内存使用率
// 如果有5K个连接&#xff0c;每个连接就分配64K&#43;64K的缓冲区的话&#xff0c;将占用640M内存&#xff0c;
// 而大多数时候&#xff0c;这些缓冲区的使用率很低
ssize_t Buffer::readFd(int fd, int* savedErrno)
{// saved an ioctl()/FIONREAD call to tell how much to read// 节省一次ioctl系统调用&#xff08;获取有多少可读数据&#xff09;char extrabuf[65536];struct iovec vec[2];const size_t writable &#61; writableBytes();// 第一块缓冲区vec[0].iov_base &#61; begin()&#43;writerIndex_;vec[0].iov_len &#61; writable;// 第二块缓冲区vec[1].iov_base &#61; extrabuf;vec[1].iov_len &#61; sizeof extrabuf;const ssize_t n &#61; sockets::readv(fd, vec, 2);if (n <0){*savedErrno &#61; errno;}else if (implicit_cast(n) <&#61; writable) //第一块缓冲区足够容纳{writerIndex_ &#43;&#61; n;}else // 当前缓冲区&#xff0c;不够容纳&#xff0c;因而数据被接收到了第二块缓冲区extrabuf&#xff0c;将其append至buffer{writerIndex_ &#61; buffer_.size();append(extrabuf, n - writable);}// if (n &#61;&#61; writable &#43; sizeof extrabuf)// {// goto line_30;// }return n;
}