作者:酸葡萄洗澡她_606 | 来源:互联网 | 2023-08-28 09:20
muduo
中,类EventLoopThreadPool
的构造函数将成员numThreads_
设置为0,表示默认不开启主从Reactor模式,即单Reactor模式。
单Reactor
模式中,该Reactor
负责监听新连接的到来、套接字的可读可写。
通过在调用void TcpServer::start()
之前,调用EventLoopThreadPool::setThreadNum()
。即可开启主从Reactor模式。
类TcpServe
r的结构如下:
class TcpServer : noncopyable
{
public:...
private:EventLoop* loop_; ...std::shared_ptr<EventLoopThreadPool> threadPool_;...
};
TcpServer
构造时&#xff0c;传入已经构造好的EventLoop对象赋值给成员loop_
&#xff0c;loop_
运行在主线程中。
称这个EventLoop
为主Reactor&#xff0c;只会负责监听新的连接请求。
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(CHECK_NOTNULL(loop)), ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option &#61;&#61; kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),nextConnId_(1)
{acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
服务器启动时&#xff0c;会调用TcpServer::start()
&#xff0c;其中又会调用EventLoopThreadPool::start(const ThreadInitCallback& cb)
&#xff0c;用来初始化并运行子线程并保存在TcpServer::threadPool_
中&#xff0c;这些子线程中运行着EventLoop
的无限事件循环。称这些运行在EventLoopThread
中的EventLoop
为从Reactor。
void TcpServer::start()
{if (started_.getAndSet(1) &#61;&#61; 0){threadPool_->start(threadInitCallback_);assert(!acceptor_->listenning());loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{assert(!started_);baseLoop_->assertInLoopThread();started_ &#61; true;for (int i &#61; 0; i < numThreads_; &#43;&#43;i){char buf[name_.size() &#43; 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread* t &#61; new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop());}if (numThreads_ &#61;&#61; 0 && cb){cb(baseLoop_);}
}
对于已经连接的套接字&#xff0c;监听它们的事件&#xff0c;由从Reactor负责, 也即是运行在子线程中的EventLoop负责。
当需要一个从Reactor时&#xff0c;需要调用EventLoopThreadPool->getNextLoop()
;
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();EventLoop* ioLoop &#61; threadPool_->getNextLoop(); char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);&#43;&#43;nextConnId_;string connName &#61; name_ &#43; buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));...;
}
EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);EventLoop* loop &#61; baseLoop_;if (!loops_.empty()){loop &#61; loops_[next_];&#43;&#43;next_;if (implicit_cast<size_t>(next_) >&#61; loops_.size()){next_ &#61; 0;}}return loop;
}