热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

muduo库net源码分析十一(应用层缓冲区Buffer设计)

为什么需要有应用层缓冲区?muduo网络库使用IO复用,并且文件描述符使用非阻塞模式,如果使用阻塞模式那么read、write就会阻塞在

为什么需要有应用层缓冲区?

muduo网络库使用IO复用,并且文件描述符使用非阻塞模式,如果使用阻塞模式那么read、write就会阻塞在这些系统调用之上,这样一来即使其他文件描述符的IO到来也不能立刻去处理,也就不能最大限度的使用IO线程。

考虑一个常见场景:程序想通过 TCP 连接 发送 100k 字节的数据,但是在 write() 调用中,操作系统只接受了 80k 字节(受 TCP advertised window 的控制,细节见 TCPv1),你肯定不想在原地等待,因为不知道 会等多久(取决于对方什么时候接受数据,然后滑动 TCP 窗口)。程序应该尽快交出 控制权,返回 event loop。在这种情况下,剩余的 20k 字节数据怎么办? 所以需要一个应用层缓冲区。

接收到数据,存至input buffer,通知上层的应用程序,OnMessage(Buffer * buff) 回调,根据应用层协议判定是否是一个完整的包,codec,如果不是一条完整的消息,不会取走数据,也不会进行相应的处理,如果是一条完整的消息,将取走这条消息,并进行相应的处理。 怎么判断一条完整的消息是应用层的逻辑.

Buffer 的要求

Muduo Buffer 的设计考虑了常见的网络编程需求,我试图在易用性和性能之间

找一个平衡点,目前这个平衡点更偏向于易用性。 Muduo Buffer 的设计要点:

  • 对外表现为一块连续的内存 (char*, len),以方便客户代码的编写。

  • 其 size() 可以自动增长,以适应不同大小的消息。它不是一个 fixed size array

    (即 char buf[8192])。

  • 内部以 vector of char 来保存数据,并提供相应的访问函数。

    Buffer 其实像是一个 queue,从末尾写入数据,从头部读出数据。

#ifndef MUDUO_NET_BUFFER_H
#define MUDUO_NET_BUFFER_H#include
#include
#include #include #include
#include #include
#include
//#include // ssize_tnamespace muduo
{
namespace net
{/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <&#61; readerIndex <&#61; writerIndex <&#61; size
/// &#64;endcode
class Buffer : public muduo::copyable
{public:static const size_t kCheapPrepend &#61; 8;static const size_t kInitialSize &#61; 1024;Buffer(): buffer_(kCheapPrepend &#43; kInitialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){assert(readableBytes() &#61;&#61; 0);assert(writableBytes() &#61;&#61; kInitialSize);assert(prependableBytes() &#61;&#61; kCheapPrepend);}// default copy-ctor, dtor and assignment are finevoid swap(Buffer& rhs){buffer_.swap(rhs.buffer_);std::swap(readerIndex_, rhs.readerIndex_);std::swap(writerIndex_, rhs.writerIndex_);}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() &#43; readerIndex_; }const char* findCRLF() const{const char* crlf &#61; std::search(peek(), beginWrite(), kCRLF, kCRLF&#43;2);return crlf &#61;&#61; beginWrite() ? NULL : crlf;}const char* findCRLF(const char* start) const{assert(peek() <&#61; start);assert(start <&#61; beginWrite());const char* crlf &#61; std::search(start, beginWrite(), kCRLF, kCRLF&#43;2);return crlf &#61;&#61; beginWrite() ? NULL : crlf;}// retrieve returns void, to prevent// string str(retrieve(readableBytes()), readableBytes());// the evaluation of two functions are unspecifiedvoid retrieve(size_t len){assert(len <&#61; readableBytes());if (len (readableBytes()));}void append(const StringPiece& str){append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data&#43;len, beginWrite());hasWritten(len);}void append(const void* /*restrict*/ data, size_t len){append(static_cast(data), len);}// 确保缓冲区可写空间>&#61;len&#xff0c;如果不足则扩充void ensureWritableBytes(size_t len){if (writableBytes() &#61; len);}char* beginWrite(){ return begin() &#43; writerIndex_; }const char* beginWrite() const{ return begin() &#43; writerIndex_; }void hasWritten(size_t len){ writerIndex_ &#43;&#61; len; }////// Append int32_t using network endian///void appendInt32(int32_t x){int32_t be32 &#61; sockets::hostToNetwork32(x);append(&be32, sizeof be32);}void appendInt16(int16_t x){int16_t be16 &#61; sockets::hostToNetwork16(x);append(&be16, sizeof be16);}void appendInt8(int8_t x){append(&x, sizeof x);}////// Read int32_t from network endian////// Require: buf->readableBytes() >&#61; sizeof(int32_t)int32_t readInt32(){int32_t result &#61; peekInt32();retrieveInt32();return result;}int16_t readInt16(){int16_t result &#61; peekInt16();retrieveInt16();return result;}int8_t readInt8(){int8_t result &#61; peekInt8();retrieveInt8();return result;}////// Peek int32_t from network endian////// Require: buf->readableBytes() >&#61; sizeof(int32_t)int32_t peekInt32() const{assert(readableBytes() >&#61; sizeof(int32_t));int32_t be32 &#61; 0;::memcpy(&be32, peek(), sizeof be32);return sockets::networkToHost32(be32);}int16_t peekInt16() const{assert(readableBytes() >&#61; sizeof(int16_t));int16_t be16 &#61; 0;::memcpy(&be16, peek(), sizeof be16);return sockets::networkToHost16(be16);}int8_t peekInt8() const{assert(readableBytes() >&#61; sizeof(int8_t));int8_t x &#61; *peek();return x;}////// Prepend int32_t using network endian///void prependInt32(int32_t x){int32_t be32 &#61; sockets::hostToNetwork32(x);prepend(&be32, sizeof be32);}void prependInt16(int16_t x){int16_t be16 &#61; sockets::hostToNetwork16(x);prepend(&be16, sizeof be16);}void prependInt8(int8_t x){prepend(&x, sizeof x);}void prepend(const void* /*restrict*/ data, size_t len){assert(len <&#61; prependableBytes());readerIndex_ -&#61; len;const char* d &#61; static_cast(data);std::copy(d, d&#43;len, begin()&#43;readerIndex_);}// 收缩&#xff0c;保留reserve个字节void shrink(size_t reserve){// FIXME: use vector::shrink_to_fit() in C&#43;&#43; 11 if possible.Buffer other;other.ensureWritableBytes(readableBytes()&#43;reserve);other.append(toStringPiece());swap(other);}/// Read data directly into buffer.////// It may implement with readv(2)/// &#64;return result of read(2), &#64;c errno is savedssize_t readFd(int fd, int* savedErrno);private:char* begin(){ return &*buffer_.begin(); }const char* begin() const{ return &*buffer_.begin(); }void makeSpace(size_t len){if (writableBytes() &#43; prependableBytes() buffer_; // vector用于替代固定大小数组size_t readerIndex_; // 读位置size_t writerIndex_; // 写位置static const char kCRLF[]; // "\r\n"
};}
}#endif // MUDUO_NET_BUFFER_H

#include #include #include
#include using namespace muduo;
using namespace muduo::net;const char Buffer::kCRLF[] &#61; "\r\n";const size_t Buffer::kCheapPrepend;
const size_t Buffer::kInitialSize;// 结合栈上的空间&#xff0c;避免内存使用过大&#xff0c;提高内存使用率
// 如果有5K个连接&#xff0c;每个连接就分配64K&#43;64K的缓冲区的话&#xff0c;将占用640M内存&#xff0c;
// 而大多数时候&#xff0c;这些缓冲区的使用率很低
ssize_t Buffer::readFd(int fd, int* savedErrno)
{// saved an ioctl()/FIONREAD call to tell how much to read// 节省一次ioctl系统调用&#xff08;获取有多少可读数据&#xff09;char extrabuf[65536];struct iovec vec[2];const size_t writable &#61; writableBytes();// 第一块缓冲区vec[0].iov_base &#61; begin()&#43;writerIndex_;vec[0].iov_len &#61; writable;// 第二块缓冲区vec[1].iov_base &#61; extrabuf;vec[1].iov_len &#61; sizeof extrabuf;const ssize_t n &#61; sockets::readv(fd, vec, 2);if (n <0){*savedErrno &#61; errno;}else if (implicit_cast(n) <&#61; writable) //第一块缓冲区足够容纳{writerIndex_ &#43;&#61; n;}else // 当前缓冲区&#xff0c;不够容纳&#xff0c;因而数据被接收到了第二块缓冲区extrabuf&#xff0c;将其append至buffer{writerIndex_ &#61; buffer_.size();append(extrabuf, n - writable);}// if (n &#61;&#61; writable &#43; sizeof extrabuf)// {// goto line_30;// }return n;
}


推荐阅读
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 文件描述符、文件句柄与打开文件之间的关联解析
    本文详细探讨了文件描述符、文件句柄和打开文件之间的关系,通过具体示例解释了它们在操作系统中的作用及其相互影响。 ... [详细]
  • 本文详细探讨了VxWorks操作系统中双向链表和环形缓冲区的实现原理及使用方法,通过具体示例代码加深理解。 ... [详细]
  • 本教程涵盖OpenGL基础操作及直线光栅化技术,包括点的绘制、简单图形绘制、直线绘制以及DDA和中点画线算法。通过逐步实践,帮助读者掌握OpenGL的基本使用方法。 ... [详细]
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • 本文探讨了 Objective-C 中的一些重要语法特性,包括 goto 语句、块(block)的使用、访问修饰符以及属性管理等。通过实例代码和详细解释,帮助开发者更好地理解和应用这些特性。 ... [详细]
  • ###问题删除目录时遇到错误提示:rm:cannotremoveusrlocaltmp’:Directorynotempty即使用rm-rf,还是会出现 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • CentOS7源码编译安装MySQL5.6
    2019独角兽企业重金招聘Python工程师标准一、先在cmake官网下个最新的cmake源码包cmake官网:https:www.cmake.org如此时最新 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • Linux设备驱动程序:异步时间操作与调度机制
    本文介绍了Linux内核中的几种异步延迟操作方法,包括内核定时器、tasklet机制和工作队列。这些机制允许在未来的某个时间点执行任务,而无需阻塞当前线程,从而提高系统的响应性和效率。 ... [详细]
  • Codeforces Round #566 (Div. 2) A~F个人题解
    Dashboard-CodeforcesRound#566(Div.2)-CodeforcesA.FillingShapes题意:给你一个的表格,你 ... [详细]
author-avatar
曾家宏惠茹冠宇
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有