作者一直强调的一个概念叫做one loop per thread
,撇开多线程不谈,本篇博文将学习,怎么将传统的I/O复用poll/epoll
封装到C++ 类中。
1.I/O复用
复习使用poll/epoll
进行I/O复用的一些编程内容。
使用poll
对于一个文件描述符fd来说,我们将通过struct pollfd
来设置我们关注的事件event,并在通过poll调用返回获取活跃的事件revent。比如说(伪代码):
struct pollfd pfds[2];
fd1 = GET_FD();
fd2 = GET_FD();
pfds[0].fd=fd1;
pfds[0].events=POLLIN;
while(true)
{
ret=poll(pfds,pfds中关心的描述符数目,TIMEOUT);
if(pfds[0].revents & POLLOUT)
{
}
}
使用epoll
epoll就要复杂一些了,主要由epoll_create
、epoll_ctl
、epoll_wait
函数组成,比如说(伪代码):
epollfd=epoll_create()
struct epoll_event ev;
ev.event=EPOLL_IN;
ev.data.fd=fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
while(true)
{
epoll_wait();
handle_event();
}
2.什么都不做的EventLoop
本文代码参见https://github.com/chenshuo/recipes
one loop per thread表示每个线程只能有一个EventLoop对象,怎么来保证呢?
作者用__thread
关键字修饰的t_loopInThisThread
来存储对象指针(它的值在多线程环境下互不干扰),每当EventLoop
进行构造的时候,判断该指针是否为NULL
也就是在之前有没有被初始化过。
__thread EventLoop* t_loopInThisThread = 0;
EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid())
{
LOG_TRACE <<"EventLoop created " <<this <<" in thread " <
if (t_loopInThisThread)
{
LOG_FATAL <<"Another EventLoop " < <<" exists in this thread " < }
else
{
t_loopInThisThread = this;
}
}
对于一个简单的EventLoop来说,poll调用封装在EventLoop::loop()
中,比如说:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
::poll(NULL, 0, 5*1000);
LOG_TRACE <<"EventLoop " <<this <<" stop looping";
}
3.Reactor关键结构
主要介绍Channel
类(用于对描述符及事件的封装和事件的分发)Poller
类(主要对poll / epoll调用的封装,但只是获取活跃事件并不负责事件分发)
我一直思考的问题就在于Channel
、Poller
、EventLoop
是如何协调工作的,所以,先从一个示例看起。
muduo::EventLoop* g_loop;
void timeout()
{
printf("Timeout!\n");
g_loop->quit();
}
int main()
{
muduo::EventLoop loop;
g_loop = &loop;
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
muduo::Channel channel(&loop, timerfd);
channel.setReadCallback(timeout);
channel.enableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 5;
::timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
::close(timerfd);
}
程序执行的结果是5秒之后timefd可读将执行回调函数,并关闭loop。
现在我们来阅读相关源码,从上面的注释,首先,需要构造一个Channel对象:
Channel::Channel(EventLoop* loop, int fdArg)
: loop_(loop),
fd_(fdArg),
events_(0),
revents_(0),
index_(-1)
{
}
之后设置回调,这里用到boost::function和boost::bind就不再详细描述。
enableReading
干了两件事情:
1、设置关心的事情为POLL_IN
,事件可读。
2、将fd添加到pollfd数组中。
第2步将依次调用Channel::update ---> EventLoop::updateChannel --->Poller::updateChannel
分别来看看这些函数:
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
void Poller::updateChannel(Channel* channel)
{
assertInLoopThread();
LOG_TRACE <<"fd = " <fd() <<" events = " <events();
if (channel->index() <0) {
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
} else {
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx <static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
pfd.fd = -1;
}
}
}
到目前位置,我们已经将我们关心的描述符及事件都设置好了,现在就需要进行真正的poll调用了,也就是loop
循环
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
}
LOG_TRACE <<"EventLoop " <<this <<" stop looping";
looping_ = false;
}
EventLoop
类中保存了一个vector
类型的activeChannels
,用来获取活跃的事件,因为从上文Poller类中的一个channel变量(map
)就可以看出Channel和fd是可以一一对应的。
再来看看Poller封装的poll调用。
Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
LOG_INFO< Timestamp now(Timestamp::now());
if (numEvents > 0) {
LOG_TRACE <" events happended"