github:https://github.com/froghui/yolanda
I/O模型和多线程模型实现
在我们的设计中,main reactor 线程是一个 acceptor 线程,这个线程一旦创建,会以 event_loop 形式阻塞在 event_dispatcher 的 dispatch 方法上,实际上,它在等待监听套接字上的事件发生,也就是已完成的连接,一旦有连接完成,就会创建出连接对象 tcp_connection,以及 channel 对象等。
当用户期望使用多个 sub-reactor 子线程时,主线程会创建多个子线程,每个子线程在创建之后,按照主线程指定的启动函数立即运行,并进行初始化。随之而来的问题是,主线程如何判断子线程已经完成初始化并启动,继续执行下去呢?这是一个需要解决的重点问题。
在设置了多个线程的情况下,需要将新创建的已连接套接字对应的读写事件交给一个 sub-reactor 线程处理。所以,这里从 thread_pool 中取出一个线程,通知这个线程有新的事件加入。而这个线程很可能是处于事件分发的阻塞调用之中,如何协调主线程数据写入给子线程,这是另一个需要解决的重点问题。
子线程是一个 event_loop 线程,它阻塞在 dispatch 上,一旦有事件发生,它就会查找 channel_map,找到对应的处理函数并执行它。之后它就会增加、删除或修改 pending 事件,再次进入下一轮的 dispatch。
文稿中放置了一张图,阐述了线程的运行关系:
为了方便你理解,我把对应的函数实现列在了另外一张图中。
主线程需要等待子线程完成初始化,也就是需要获取子线程对应数据的反馈,而子线程初始化也是对这部分数据进行初始化,实际上这是一个多线程的通知问题。采用的做法在前面提到过,使用 mutex 和 condition 两个主要武器。
下面这段代码是主线程发起的子线程创建,调用 event_loop_thread_init 对每个子线程初始化,之后调用 event_loop_thread_start 来启动子线程。注意,如果应用程序指定的线程池大小为 0,则直接返回,这样 acceptor 和 I/O 事件都会在同一个主线程里处理,就退化为单 reactor 模式。
//一定是main thread发起
void thread_pool_start(struct thread_pool *threadPool) {assert(!threadPool->started);assertInSameThread(threadPool->mainLoop);threadPool->started &#61; 1;void *tmp;if (threadPool->thread_number <&#61; 0) {return;}threadPool->eventLoopThreads &#61; malloc(threadPool->thread_number * sizeof(struct event_loop_thread));for (int i &#61; 0; i
}
我们再看一下 event_loop_thread_start 这个方法&#xff0c;这个方法一定是主线程运行的。这里我使用了 pthread_create 创建了子线程&#xff0c;子线程一旦创建&#xff0c;立即执行 event_loop_thread_run&#xff0c;我们稍后将看到&#xff0c;event_loop_thread_run 进行了子线程的初始化工作。event_loop_thread_start 最重要的部分是使用了 pthread_mutex_lock 和 pthread_mutex_unlock 进行了加锁和解锁&#xff0c;并使用了 pthread_cond_wait 来守候 eventLoopThread 中的 eventLoop 的变量。
//由主线程调用&#xff0c;初始化一个子线程&#xff0c;并且让子线程开始运行event_loop
struct event_loop *event_loop_thread_start(struct event_loop_thread *eventLoopThread) {pthread_create(&eventLoopThread->thread_tid, NULL, &event_loop_thread_run, eventLoopThread);assert(pthread_mutex_lock(&eventLoopThread->mutex) &#61;&#61; 0);while (eventLoopThread->eventLoop &#61;&#61; NULL) {assert(pthread_cond_wait(&eventLoopThread->cond, &eventLoopThread->mutex) &#61;&#61; 0);}assert(pthread_mutex_unlock(&eventLoopThread->mutex) &#61;&#61; 0);yolanda_msgx("event loop thread started, %s", eventLoopThread->thread_name);return eventLoopThread->eventLoop;
}
为什么要这么做呢&#xff1f;看一下子线程的代码你就会大致明白。子线程执行函数 event_loop_thread_run 一上来也是进行了加锁&#xff0c;之后初始化 event_loop 对象&#xff0c;当初始化完成之后&#xff0c;调用了 pthread_cond_signal 函数来通知此时阻塞在 pthread_cond_wait 上的主线程。这样&#xff0c;主线程就会从 wait 中苏醒&#xff0c;代码得以往下执行。子线程本身也通过调用 event_loop_run 进入了一个无限循环的事件分发执行体中&#xff0c;等待子线程 reactor 上注册过的事件发生。
void *event_loop_thread_run(void *arg) {struct event_loop_thread *eventLoopThread &#61; (struct event_loop_thread *) arg;pthread_mutex_lock(&eventLoopThread->mutex);// 初始化化event loop&#xff0c;之后通知主线程eventLoopThread->eventLoop &#61; event_loop_init();yolanda_msgx("event loop thread init and signal, %s", eventLoopThread->thread_name);pthread_cond_signal(&eventLoopThread->cond);pthread_mutex_unlock(&eventLoopThread->mutex);//子线程event loop runeventLoopThread->eventLoop->thread_name &#61; eventLoopThread->thread_name;event_loop_run(eventLoopThread->eventLoop);
}
可以看到&#xff0c;这里主线程和子线程共享的变量正是每个 event_loop_thread 的 eventLoop 对象&#xff0c;这个对象在初始化的时候为 NULL&#xff0c;只有当子线程完成了初始化&#xff0c;才变成一个非 NULL 的值&#xff0c;这个变化是子线程完成初始化的标志&#xff0c;也是信号量守护的变量。通过使用锁和信号量&#xff0c;解决了主线程和子线程同步的问题。当子线程完成初始化之后&#xff0c;主线程才会继续往下执行。
struct event_loop_thread {struct event_loop *eventLoop;pthread_t thread_tid; /* thread ID */pthread_mutex_t mutex;pthread_cond_t cond;char * thread_name;long thread_count; /* # connections handled */
};
你可能会问&#xff0c;主线程是循环在等待每个子线程完成初始化&#xff0c;如果进入第二个循环&#xff0c;等待第二个子线程完成初始化&#xff0c;而此时第二个子线程已经初始化完成了&#xff0c;该怎么办&#xff1f;注意我们这里一上来是加锁的&#xff0c;只要取得了这把锁&#xff0c;同时发现 event_loop_thread 的 eventLoop 对象已经变成非 NULL 值&#xff0c;可以肯定第二个线程已经初始化&#xff0c;就直接释放锁往下执行了。
你可能还会问&#xff0c;在执行 pthread_cond_wait 的时候&#xff0c;需要持有那把锁么&#xff1f;这里&#xff0c;父线程在调用 pthread_cond_wait 函数之后&#xff0c;会立即进入睡眠&#xff0c;并释放持有的那把互斥锁。而当父线程再从 pthread_cond_wait 返回时&#xff08;这是子线程通过 pthread_cond_signal 通知达成的&#xff09;&#xff0c;该线程再次持有那把锁。
前面提到&#xff0c;主线程是一个 main reactor 线程&#xff0c;这个线程负责检测监听套接字上的事件&#xff0c;当有事件发生时&#xff0c;也就是一个连接已完成建立&#xff0c;如果我们有多个 sub-reactor 子线程&#xff0c;我们期望的结果是&#xff0c;把这个已连接套接字相关的 I/O 事件交给 sub-reactor 子线程负责检测。这样的好处是&#xff0c;main reactor 只负责连接套接字的建立&#xff0c;可以一直维持在一个非常高的处理效率&#xff0c;在多核的情况下&#xff0c;多个 sub-reactor 可以很好地利用上多核处理的优势。
我们知道&#xff0c;sub-reactor 线程是一个无限循环的 event loop 执行体&#xff0c;在没有已注册事件发生的情况下&#xff0c;这个线程阻塞在 event_dispatcher 的 dispatch 上。你可以简单地认为阻塞在 poll 调用或者 epoll_wait 上&#xff0c;这种情况下&#xff0c;主线程如何能把已连接套接字交给 sub-reactor 子线程呢&#xff1f;
如果我们能让 sub-reactor 线程从 event_dispatcher 的 dispatch 上返回&#xff0c;再让 sub-reactor 线程返回之后能够把新的已连接套接字事件注册上&#xff0c;这件事情就算完成了。
那如何让 sub-reactor 线程从 event_dispatcher 的 dispatch 上返回呢&#xff1f;答案是构建一个类似管道一样的描述字&#xff0c;让 event_dispatcher 注册该管道描述字&#xff0c;当我们想让 sub-reactor 线程苏醒时&#xff0c;往管道上发送一个字符就可以了。
在 event_loop_init 函数里&#xff0c;调用了 socketpair 函数创建了套接字对&#xff0c;这个套接字对的作用就是我刚刚说过的&#xff0c;往这个套接字的一端写时&#xff0c;另外一端就可以感知到读的事件。其实&#xff0c;这里也可以直接使用 UNIX 上的 pipe 管道&#xff0c;作用是一样的。
struct event_loop *event_loop_init() {...//add the socketfd to event 这里创建的是套接字对&#xff0c;目的是为了唤醒子线程eventLoop->owner_thread_id &#61; pthread_self();if (socketpair(AF_UNIX, SOCK_STREAM, 0, eventLoop->socketPair) <0) {LOG_ERR("socketpair set fialed");}eventLoop->is_handle_pending &#61; 0;eventLoop->pending_head &#61; NULL;eventLoop->pending_tail &#61; NULL;eventLoop->thread_name &#61; "main thread";struct channel *channel &#61; channel_new(eventLoop->socketPair[1], EVENT_READ, handleWakeup, NULL, eventLoop);event_loop_add_channel_event(eventLoop, eventLoop->socketPair[0], channel);return eventLoop;
}
要特别注意的是文稿中的这句代码&#xff0c;这告诉 event_loop 的&#xff0c;是注册了 socketPair[1]描述字上的 READ 事件&#xff0c;如果有 READ 事件发生&#xff0c;就调用 handleWakeup 函数来完成事件处理。
struct channel *channel &#61; channel_new(eventLoop->socketPair[1], EVENT_READ, handleWakeup, NULL, eventLoop);
事实上&#xff0c;这个函数就是简单的从 socketPair[1]描述字上读取了一个字符而已&#xff0c;除此之外&#xff0c;它什么也没干。它的主要作用就是让子线程从 dispatch 的阻塞中苏醒。
int handleWakeup(void * data) {struct event_loop *eventLoop &#61; (struct event_loop *) data;char one;ssize_t n &#61; read(eventLoop->socketPair[1], &one, sizeof one);if (n !&#61; sizeof one) {LOG_ERR("handleWakeup failed");}yolanda_msgx("wakeup, %s", eventLoop->thread_name);
}
现在&#xff0c;我们再回过头看看&#xff0c;如果有新的连接产生&#xff0c;主线程是怎么操作的&#xff1f;在 handle_connection_established 中&#xff0c;通过 accept 调用获取了已连接套接字&#xff0c;将其设置为非阻塞套接字&#xff08;切记&#xff09;&#xff0c;接下来调用 thread_pool_get_loop 获取一个 event_loop。thread_pool_get_loop 的逻辑非常简单&#xff0c;从 thread_pool 线程池中按照顺序挑选出一个线程来服务。接下来是创建了 tcp_connection 对象。
//处理连接已建立的回调函数
int handle_connection_established(void *data) {struct TCPserver *tcpServer &#61; (struct TCPserver *) data;struct acceptor *acceptor &#61; tcpServer->acceptor;int listenfd &#61; acceptor->listen_fd;struct sockaddr_in client_addr;socklen_t client_len &#61; sizeof(client_addr);//获取这个已建立的套集字&#xff0c;设置为非阻塞套集字int connected_fd &#61; accept(listenfd, (struct sockaddr *) &client_addr, &client_len);make_nonblocking(connected_fd);yolanda_msgx("new connection established, socket &#61;&#61; %d", connected_fd);//从线程池里选择一个eventloop来服务这个新的连接套接字struct event_loop *eventLoop &#61; thread_pool_get_loop(tcpServer->threadPool);// 为这个新建立套接字创建一个tcp_connection对象&#xff0c;并把应用程序的callback函数设置给这个tcp_connection对象struct tcp_connection *tcpConnection &#61; tcp_connection_new(connected_fd, eventLoop,tcpServer->connectionCompletedCallBack,tcpServer->connectionClosedCallBack,tcpServer->messageCallBack,tcpServer->writeCompletedCallBack);//callback内部使用if (tcpServer->data !&#61; NULL) {tcpConnection->data &#61; tcpServer->data;}return 0;
}
在调用 tcp_connection_new 创建 tcp_connection 对象的代码里&#xff0c;可以看到先是创建了一个 channel 对象&#xff0c;并注册了 READ 事件&#xff0c;之后调用 event_loop_add_channel_event 方法往子线程中增加 channel 对象。
tcp_connection_new(int connected_fd, struct event_loop *eventLoop,connection_completed_call_back connectionCompletedCallBack,connection_closed_call_back connectionClosedCallBack,message_call_back messageCallBack, write_completed_call_back writeCompletedCallBack) {...//为新的连接对象创建可读事件struct channel *channel1 &#61; channel_new(connected_fd, EVENT_READ, handle_read, handle_write, tcpConnection);tcpConnection->channel &#61; channel1;//完成对connectionCompleted的函数回调if (tcpConnection->connectionCompletedCallBack !&#61; NULL) {tcpConnection->connectionCompletedCallBack(tcpConnection);}//把该套集字对应的channel对象注册到event_loop事件分发器上event_loop_add_channel_event(tcpConnection->eventLoop, connected_fd, tcpConnection->channel);return tcpConnection;
}
请注意&#xff0c;到现在为止的操作都是在主线程里执行的。下面的 event_loop_do_channel_event 也不例外&#xff0c;接下来的行为我期望你是熟悉的&#xff0c;那就是加解锁。如果能够获取锁&#xff0c;主线程就会调用 event_loop_channel_buffer_nolock 往子线程的数据中增加需要处理的 channel event 对象。所有增加的 channel 对象以列表的形式维护在子线程的数据结构中。接下来的部分是重点&#xff0c;如果当前增加 channel event 的不是当前 event loop 线程自己&#xff0c;就会调用 event_loop_wakeup 函数把 event_loop 子线程唤醒。唤醒的方法很简单&#xff0c;就是往刚刚的 socketPair[0]上写一个字节&#xff0c;别忘了&#xff0c;event_loop 已经注册了 socketPair[1]的可读事件。如果当前增加 channel event 的是当前 event loop 线程自己&#xff0c;则直接调用 event_loop_handle_pending_channel 处理新增加的 channel event 事件列表。
int event_loop_do_channel_event(struct event_loop *eventLoop, int fd, struct channel *channel1, int type) {//get the lockpthread_mutex_lock(&eventLoop->mutex);assert(eventLoop->is_handle_pending &#61;&#61; 0);//往该线程的channel列表里增加新的channelevent_loop_channel_buffer_nolock(eventLoop, fd, channel1, type);//release the lockpthread_mutex_unlock(&eventLoop->mutex);//如果是主线程发起操作&#xff0c;则调用event_loop_wakeup唤醒子线程if (!isInSameThread(eventLoop)) {event_loop_wakeup(eventLoop);} else {//如果是子线程自己&#xff0c;则直接可以操作event_loop_handle_pending_channel(eventLoop);}return 0;
}
如果是 event_loop 被唤醒之后&#xff0c;接下来也会执行 event_loop_handle_pending_channel 函数。你可以看到在循环体内从 dispatch 退出之后&#xff0c;也调用了 event_loop_handle_pending_channel 函数。
int event_loop_run(struct event_loop *eventLoop) {assert(eventLoop !&#61; NULL);struct event_dispatcher *dispatcher &#61; eventLoop->eventDispatcher;if (eventLoop->owner_thread_id !&#61; pthread_self()) {exit(1);}yolanda_msgx("event loop run, %s", eventLoop->thread_name);struct timeval timeval;timeval.tv_sec &#61; 1;while (!eventLoop->quit) {//block here to wait I/O event, and get active channelsdispatcher->dispatch(eventLoop, &timeval);//这里处理pending channel&#xff0c;如果是子线程被唤醒&#xff0c;这个部分也会立即执行到event_loop_handle_pending_channel(eventLoop);}yolanda_msgx("event loop end, %s", eventLoop->thread_name);return 0;
}
event_loop_handle_pending_channel 函数的作用是遍历当前 event loop 里 pending 的 channel event 列表&#xff0c;将它们和 event_dispatcher 关联起来&#xff0c;从而修改感兴趣的事件集合。这里有一个点值得注意&#xff0c;因为 event loop 线程得到活动事件之后&#xff0c;会回调事件处理函数&#xff0c;这样像 onMessage 等应用程序代码也会在 event loop 线程执行&#xff0c;如果这里的业务逻辑过于复杂&#xff0c;就会导致 event_loop_handle_pending_channel 执行的时间偏后&#xff0c;从而影响 I/O 的检测。所以&#xff0c;将 I/O 线程和业务逻辑线程隔离&#xff0c;让 I/O 线程只负责处理 I/O 交互&#xff0c;让业务线程处理业务&#xff0c;是一个比较常见的做法。
温故而知新 &#xff01;