作者:mobiledu2502926247 | 来源:互联网 | 2023-06-14 18:15
前言上篇是在看ngx_worker_process_cycle()中的一个插曲,这篇继续,现在还不可能做到全知全解,只能随着源码的深入慢慢地了解。worker进程循环工作N
前言
上篇是在看ngx_worker_process_cycle()中的一个插曲,这篇继续,现在还不可能做到全知全解,只能随着源码的深入慢慢地了解。
worker进程循环工作
Nginx采用信号的IPC方式对worker进程进行控制,其中的ngx_terminate、ngx_quit、ngx_repon都将由在ngx_signal_handler方法根据收到的信号进行处理。而信号的初始化都是在main()中的ngx_init_signals()中完成的。
下面看源码吧:
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_int_t worker = (intptr_t) data;
ngx_uint_t i;
ngx_connection_t *c;
/* 从master进程继承过来的全局变量,设置其为worker进程 */
ngx_process = NGX_PROCESS_WORKER;
/* Worker进程初始化,下面详述 */
ngx_worker_process_init(cycle, worker);
/* 更改进程名字,这个很有意思...详见《笔记十五》 */
ngx_setproctitle("worker process");
/* 针对worker子进程中的线程机制的,一般用不到,这里通过设置NGX_THREADS宏来开启 */
#if (NGX_THREADS)
{
ngx_int_t n;
ngx_err_t err;
ngx_core_conf_t *ccf;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
if (ngx_threads_n) {
if (ngx_init_threads(ngx_threads_n, ccf->thread_stack_size, cycle)
== NGX_ERROR)
{
/* fatal */
exit(2);
}
err = ngx_thread_key_create(&ngx_core_tls_key);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
ngx_thread_key_create_n " failed");
/* fatal */
exit(2);
}
for (n = 0; n
ngx_threads[n].cv = ngx_cond_init(cycle->log);
if (ngx_threads[n].cv == NULL) {
/* fatal */
exit(2);
}
if (ngx_create_thread((ngx_tid_t *) &ngx_threads[n].tid,
ngx_worker_thread_cycle,
(void *) &ngx_threads[n], cycle->log)
!= 0)
{
/* fatal */
exit(2);
}
}
}
}
#endif
/* 这里Nginx是通过采用信号的方式,控制worker进程运行,具体的处理信号的方法
* 其实在main()函数中的ngx_init_signals()初始化工作中已经弄好,可以见signals[]数组中的
* ngx_signal_handler()处理函数,之后打算好好看看ngx_init_signals()这个函数
*/
for ( ;; ) {
/* ngx_exiting将在worker进程收到SIGQUIT信号后设置,具体设置就在下面的ngx_quit语句中 */
if (ngx_exiting) {
c = cycle->connections;
/* 这是在worker进程收到quit信号退出前,必须将connections上读到的事件处理完 */
for (i = 0; i connection_n; i++) {
/* THREAD: lock */
if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}
/* 这里就可以调用ngx_worker_process_exit退出了,这个if条件用到红黑树。。怎么用到的,以后在细究 */
if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
{
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
/* 这个就是worker进程具体的时间和定时器核心处理函数!!!以后慢慢看! */
ngx_process_events_and_timers(cycle);
/* worker进程收到了SIGINT信号,直接强制退出 */
if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
/* worker进程收到了SIGQUIT信号,如果此时worker进程不是不是处于exiting状态,
* 就将设置ngx_exiting为1,让其进入exiting状态;同时关闭监听套接口,待看...
*/
if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"gracefully shutting down");
ngx_setproctitle("worker process is shutting down");
if (!ngx_exiting) {
ngx_close_listening_sockets(cycle);
ngx_exiting = 1;
}
}
/* worker进程收到了SIGUSR1信号,通知Nginx需要重新打开文件 */
if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
}
}
下面是ngx_worker_process_init()的工作:
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
sigset_t set;
uint64_t cpu_affinity;
ngx_int_t n;
ngx_uint_t i;
struct rlimit rlmt;
ngx_core_conf_t *ccf;
ngx_listening_t *ls;
/* 设置全局的环境变量environ,不细看了,细看暂时也弄不明白 */
if (ngx_set_environment(cycle, NULL) == NULL) {
/* fatal */
exit(2);
}
/* 获取核心模块配置 */
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
/* 设置worker进程优先级 */
if (worker >= 0 && ccf->priority != 0) {
if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setpriority(%d) failed", ccf->priority);
}
}
/* 这里是给成员做资源使用的限制,可以查下setrlimit()系统调用 */
if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;
if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setrlimit(RLIMIT_NOFILE, %i) failed",
ccf->rlimit_nofile);
}
}
if (ccf->rlimit_core != NGX_CONF_UNSET) {
rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
rlmt.rlim_max = (rlim_t) ccf->rlimit_core;
if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setrlimit(RLIMIT_CORE, %O) failed",
ccf->rlimit_core);
}
}
#ifdef RLIMIT_SIGPENDING
if (ccf->rlimit_sigpending != NGX_CONF_UNSET) {
rlmt.rlim_cur = (rlim_t) ccf->rlimit_sigpending;
rlmt.rlim_max = (rlim_t) ccf->rlimit_sigpending;
if (setrlimit(RLIMIT_SIGPENDING, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setrlimit(RLIMIT_SIGPENDING, %i) failed",
ccf->rlimit_sigpending);
}
}
#endif
/* geteuid()用于获取执行目前进程有效的用户识别码,root用户的uid为0,这里是说如果拥有root超级权限 */
if (geteuid() == 0) {
/* 设置组ID,目的猜测是多个worker进程共享同一个目录下的文件 */
if (setgid(ccf->group) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setgid(%d) failed", ccf->group);
/* fatal */
exit(2);
}
/* 在user中设置组ID,猜测 */
if (initgroups(ccf->username, ccf->group) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"initgroups(%s, %d) failed",
ccf->username, ccf->group);
}
/* 将worker进程设置为拥有该文件所有者同样的权限 */
if (setuid(ccf->user) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setuid(%d) failed", ccf->user);
/* fatal */
exit(2);
}
}
/* 这里九分猜测是给worker进程分配CPU */
if (worker >= 0) {
cpu_affinity = ngx_get_cpu_affinity(worker);
if (cpu_affinity) {
ngx_setaffinity(cpu_affinity, cycle->log);
}
}
#if (NGX_HAVE_PR_SET_DUMPABLE)
/* allow coredump after setuid() in Linux 2.4.x */
if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"prctl(PR_SET_DUMPABLE) failed");
}
#endif
/* 这里是更改工作路径 */
if (ccf->working_directory.len) {
if (chdir((char *) ccf->working_directory.data) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"chdir(\"%s\") failed", ccf->working_directory.data);
/* fatal */
exit(2);
}
}
/* 初始化信号集 */
sigemptyset(&set);
/* 将set集中的信号设为阻塞。
* 这里是让worker进程对set集中的信号延迟处理
*/
if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
/*
* disable deleting previous events for the listening sockets because
* in the worker processes there are no events at all at this point
*/
/* worker子进程所有的监听端口初始化 */
ls = cycle->listening.elts;
for (i = 0; i listening.nelts; i++) {
ls[i].previous = NULL;
}
/* worker子进程进行所有模块的自定义初始化 */
for (i = 0; ngx_modules[i]; i++) {
if (ngx_modules[i]->init_process) {
if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
/* fatal */
exit(2);
}
}
}
/* ngx_processes[]是一个所有worker进程共享的全局变量,这里为了保证当前worker进程
* 禁止读取master进程给其他worker进程的消息,所以这里必须关闭所有其他worker进程的读端描述符。
* 然后,保留对其他worker进程的写端描述符和自身的读端描述符,目的是能够保持所有worker进程间的通信。
* 其他所有worker进程也都如此做。一句话就是,通过其他worker进程写端写入消息给别人,通过自身读端来接收来自master
* 和其他worker进程的消息
*/
for (n = 0; n
/* 不存在的worker进程,跳过 */
if (ngx_processes[n].pid == -1) {
continue;
}
/* 该worker进程本身,跳过 */
if (n == ngx_process_slot) {
continue;
}
/* channel[1]已关闭,跳过 */
if (ngx_processes[n].channel[1] == -1) {
continue;
}
/* 对其他worker进程的读端文件描述符关闭,保留写端文件描述符保持worker间的通信之用 */
if (close(ngx_processes[n].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
}
/* 关闭自身的写端文件描述符,因为每个worker进程只需要从自己的读端读取消息,而不用给自己写消息。
* 需要用到自身的写端文件描述符的是master和其他worker进程。这也是上面为什么要关闭其他worker进程的读端,保留写端的原因。
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
#if 0
ngx_last_process = 0;
#endif
/* 这里是令Nginx开始关注当前worker进程中channel读端的读事件,下面稍微描述下,现在还不能一窥全景 */
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler)
== NGX_ERROR)
{
/* fatal */
exit(2);
}
}
ngx_int_t
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,
ngx_event_handler_pt handler)
{
ngx_event_t *ev, *rev, *wev;
ngx_connection_t *c;
/* 这些个参数配置看是看了,但暂时不清楚怎么用,以后再来说 */
c = ngx_get_connection(fd, cycle->log);
if (c == NULL) {
return NGX_ERROR;
}
c->pool = cycle->pool;
rev = c->read;
wev = c->write;
rev->log = cycle->log;
wev->log = cycle->log;
#if (NGX_THREADS)
rev->lock = &c->lock;
wev->lock = &c->lock;
rev->own_lock = &c->lock;
wev->own_lock = &c->lock;
#endif
rev->channel = 1;
wev->channel = 1;
ev = (event == NGX_READ_EVENT) ? rev : wev;
ev->handler = handler;
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_free_connection(c);
return NGX_ERROR;
}
} else {
/* 猜的不错,这个应该是重点,调用ngx_add_event宏,在这里如果是epoll的话,
* 应该是调用ngx_epoll_add_event函数,将该channel上的读事件加入到epoll
*/
if (ngx_add_event(ev, event, 0) == NGX_ERROR) {
ngx_free_connection(c);
return NGX_ERROR;
}
}
return NGX_OK;
}
总结
分析源码要考虑前因后果,单单停留在代码的表面表示会有很多难度。看完这个worker循环,对信号量的设置,以及进程初始化的工作有了个初步的认识。另外,就单独对于代码技巧方面,比如加入epoll事件等等。。。Nginx确实是一套很牛的代码。