作者:李乐
本文基于Swoole-4.3.2和PHP-7.1.0版本
Swoole4为 PHP 语言提供了强大的CSP协程编程模式,用户可以通过 go 函数创建一个协程,以达到并发执行的效果,如下面代码所示:
其实在Swoole4之前就实现了多协程编程模式,在协程创建、切换以及结束的时候,相应的操作php栈即可(创建、切换以及回收php栈)。
此时的协程实现无法完美的支持php语法,其根本原因在于没有保存c栈信息。(vm内部或者某些扩展提供的API是通过c函数实现的,调用这些函数时如果发生协程切换,c栈该如何处理?)
Swoole4新增了c栈的管理,在协程创建、切换以及结束的同时会伴随着c栈的创建、切换以及回收。
Swoole4协程实现方案如下图所示:
其中:
对应c栈的创建以及换入换出。
函数是对代码的封装,对外暴露的只是一组指定的参数和一个可选的返回值;假设函数P调用函数Q,Q执行后返回函数P,实现该函数调用需要考虑以下三点:
大多数语言的函数调用都采用了栈结构实现,函数的调用与返回即对应的是一系列的入栈与出栈操作,我们通常称之为函数栈帧(stack frame)。示意图如下:
上面提到的程序计数器即寄存器%rip,另外还有两个寄存器需要重点关注:%rbp指向栈帧底部,%rsp指向栈帧顶部。
下面将通过具体的代码事例,为读者讲解函数栈帧。c代码与汇编代码如下:
int add(int x, int y) { int a, b; a = 10; b = 5; return x+y; } int main() { int sum = add(1,2); }
main: pushq %rbp movq %rsp, %rbp subq $16, %rsp movl $2, %esi movl $1, %edi call add movl %eax, -4(%rbp) leave ret add: pushq %rbp movq %rsp, %rbp movl %edi, -20(%rbp) movl %esi, -24(%rbp) movl $10, -4(%rbp) movl $5, -8(%rbp) movl -24(%rbp), %eax movl -20(%rbp), %edx addl %edx, %eax popq %rbp ret
分析汇编代码:
问题:观察上面的汇编代码,输入参数分别使用的是寄存器%edi和%esi,返回值使用的是寄存器%eax,输入输出参数不应该保存在栈上吗?寄存器比内存访问要快的多,现代处理器寄存器数目也比较多,因此倾向于将参数优先保存在寄存器。比如%rdi, %rsi, %rdx, %rcx, %r8d, %r9d 六个寄存器用于存储函数调用时的前6个参数,那么当输入参数数目超过6个时,如何处理?这些输入参数只能存储在栈上了。
(%rdi等表示64位寄存器,%edi等表示32位寄存器)
//add函数需要9个参数 add(1,2,3,4,5,6,7,8,9); //参数7,8,9存储在栈上 movl $9, 16(%rsp) movl $8, 8(%rsp) movl $7, (%rsp) movl $6, %r9d movl $5, %r8d movl $4, %ecx movl $3, %edx movl $2, %esi movl $1, %edi
通过学习c栈基本知识,我们知道最主要有三个寄存器:%rip程序计数器指向下一条需要执行的指令,%rbp指向函数栈帧底部,%rsp指向函数栈帧顶部。这三个寄存器可以确定一个c栈执行上下文,c栈的管理其实就是这些寄存器的管理。
第一节我们提到Swoole在管理c栈时,用到了 boost.context库,其中make_fcontext()和jump_fcontext()函数均使用汇编语言编写,实现了c栈执行上下文的创建以及切换;函声明命如下:
fcontext_t make_fcontext(void *sp, size_t size, void (*fn)(intptr_t)); intptr_t jump_fcontext(fcontext_t *ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);
make_fcontext函数用于创建一个执行上下文,其中参数sp指向内存最高地址处(在堆中分配一块内存作为该执行上下文的c栈),参数size为栈大小,参数fn是一个函数指针,指向该执行上下文的入口函数;代码主要逻辑如下:
/*%rdi表示第一个参数sp,指向栈顶*/ movq %rdi, %rax //保证%rax指向的地址按照16字节对齐 andq $-16, %rax //将%rax向低地址处偏移0x48字节 leaq -0x48(%rax), %rax /* %rdx表示第三个参数fn,保存在%rax偏移0x38位置处 */ movq %rdx, 0x38(%rax) stmxcsr (%rax) fnstcw 0x4(%rax) leaq finish(%rip), %rcx movq %rcx, 0x40(%rax) //返回值保存在%rax寄存器 ret
make_fcontext函数创建的执行上下文示意图如下(可以看到预留了若干字节用于保存上下文信息):
Swoole协程实现的Context层封装了上下文的创建,创建上下文函数实现如下:
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { stack_ = (char*) sw_malloc(stack_size_); void* sp = (void*) ((char*) stack_ + stack_size_); ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }
可以看到c栈执行上下文是通过sw_malloc函数在堆上分配的一块内存,默认大小为2M字节;参数sp指向的是内存最高地址处;执行上下文的入口函数为Context::context_func()。
jump_fcontext函数用于切换c栈上下文:1)函数会将当前上下文(寄存器)保存在当前栈顶(push),同时将%rsp寄存器内容保存在ofc地址;2)函数从nfc地址处恢复%rsp寄存器内容,同时从栈顶恢复上下文信息(pop)。代码主要逻辑如下:
//-------------------保存当前c栈上下文------------------- pushq %rbp /* save RBP */ pushq %rbx /* save RBX */ pushq %r15 /* save R15 */ pushq %r14 /* save R14 */ pushq %r13 /* save R13 */ pushq %r12 /* save R12 */ leaq -0x8(%rsp), %rsp stmxcsr (%rsp) fnstcw 0x4(%rsp) //%rdi表示第一个参数,即ofc,保存%rsp到ofc地址处 movq %rsp, (%rdi) //-------------------从nfc中恢复上下文------------------- //%rsi表示第二个参数,即nfc,从nfc地址处恢复%rsp movq %rsi, %rsp ldmxcsr (%rsp) fldcw 0x4(%rsp) leaq 0x8(%rsp), %rsp popq %r12 /* restrore R12 */ popq %r13 /* restrore R13 */ popq %r14 /* restrore R14 */ popq %r15 /* restrore R15 */ popq %rbx /* restrore RBX */ popq %rbp /* restrore RBP */ //这里弹出的其实是之前保存的%rip popq %r8 //%rdx表示第三个参数,%rax用于存储函数返回值; movq %rdx, %rax //%rdi用于存储第一个参数 movq %rdx, %rdi //跳转到%r8指向的地址 jmp *%r8
观察jump_fcontext函数的汇编代码,可以看到保存上下文与恢复上下文的代码基本是对称的。恢复上下文时"popq %r8"用于弹出上一次保存的程序计数器%rip的内容,然而并没有看到保存寄存器%rip的代码。这是因为调用jump_fcontext函数时,底层call指令已经将%rip入栈了。
Swoole协程实现的Context层封装了上下文的换入换出,可以在上下文swap_ctx_和ctx_之间随时换入换出,代码实现如下:
bool Context::SwapIn() { jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); return true; } bool Context::SwapOut() { jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true); return true; }
上下文示意图如下所示:
php代码在执行时,同样存在函数栈帧的分配与回收。php将此抽象为两个结构,php栈zend_vm_stack,与执行数据(函数栈帧)zend_execute_data。
php栈结构与c栈结构基本类似,定义如下:
struct _zend_vm_stack { zval *top; zval *end; zend_vm_stack prev; };
其中top字段指向栈顶位置,end字段指向栈底位置;prev指向上一个栈,形成链表,当栈空间不够时,可以进行扩容。php虚拟机申请栈空间时默认大小为256K,Swoole创建栈空间时默认大小为8K。
执行数据结构体,我们需要重点关注这几个字段:当前函数编译后的指令集(opline指向指令集数组中的某一个元素,虚拟机只需要遍历该数组并执行所有指令即可),函数返回值,以及调用该函数的执行数据;结构定义如下:
struct _zend_execute_data { //当前执行指令 const zend_op *opline; zend_execute_data *call; //函数返回值 zval *return_value; zend_function *func; zval This; /* this + call_info + num_args */ //调用当前函数的栈帧 zend_execute_data *prev_execute_data; //符号表 zend_array *symbol_table; #if ZEND_EX_USE_RUN_TIME_CACHE void **run_time_cache; #endif #if ZEND_EX_USE_LITERALS //常量数组 zval *literals; #endif };
php栈初始化函数为zend_vm_stack_init;当执行用户函数调用时,虚拟机通过函数zend_vm_stack_push_call_frame在php栈上分配新的执行数据,并执行该函数代码;函数执行完成后,释放该执行数据。代码逻辑如下:
ZEND_API void zend_execute(zend_op_array *op_array, zval *return_value) { //分配新的执行数据 execute_data = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_CODE | ZEND_CALL_HAS_SYMBOL_TABLE, (zend_function*)op_array, 0, zend_get_called_scope(EG(current_execute_data)), zend_get_this_object(EG(current_execute_data))); //设置prev execute_data->prev_execute_data = EG(current_execute_data); //初始化当前执行数据,op_array即为当前函数编译得到的指令集 i_init_execute_data(execute_data, op_array, return_value); //执行函数代码 zend_execute_ex(execute_data); //释放执行数据 zend_vm_stack_free_call_frame(execute_data); }
php栈帧结构示意图如下:
Swoole协程实现,需要自己管理php栈,在发生协程创建以及切换时,对应的创建新的php栈,切换php栈,同时保存和恢复php栈上下文信息。这里涉及到一个很重要的结构体php_coro_task:
struct php_coro_task { zval *vm_stack_top; zval *vm_stack_end; zend_vm_stack vm_stack; zend_execute_data *execute_data; };
这里列出了php_coro_task结构体的若干关键字段,这些字段用于保存和恢复php上下文信息。
协程创建时,底层通过函数PHPCoroutine::create_func实现了php栈的创建:
void PHPCoroutine::create_func(void *arg) { //创建并初始化php栈 vm_stack_init(); call = (zend_execute_data *) (EG(vm_stack_top)); //为结构php_coro_task分配空间 task = (php_coro_task *) EG(vm_stack_top); EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval)); //创建新的执行数据结构 call = zend_vm_stack_push_call_frame( ZEND_CALL_TOP_FUNCTION | ZEND_CALL_ALLOCATED, func, argc, fci_cache.called_scope, fci_cache.object ); }
从代码中可以看到结构php_coro_task是直接存储在php栈的底部。
当通过yield函数让出CPU时,底层会调用函数PHPCoroutine::on_yield切换php栈:
void PHPCoroutine::on_yield(void *arg) { php_coro_task *task = (php_coro_task *) arg; php_coro_task *origin_task = get_origin_task(task); //保存当前php栈上下文信息到php_coro_task结构 save_task(task); //从php_coro_task结构中恢复php栈上下文信息 restore_task(origin_task); }
前面我们简单介绍了Swoole协程的实现方案,以及Swoole对c栈与php栈的管理,接下来将结合前面的知识,系统性的介绍Swoole协程的实现原理。
话不多说,先看一张图:
Swoole创建协程可以使用go()函数,底层实现对应的是PHP_FUNCTION(swoole_coroutine_create),其函数实现如下:
PHP_FUNCTION(swoole_coroutine_create) { …… long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params); } long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { …… save_task(get_task()); return Coroutine::create(create_func, (void*) &php_coro_args); } class Coroutine { public: static inline long create(coroutine_func_t fn, void* args = nullptr) { return (new Coroutine(fn, args))->run(); } }
class PHPCoroutine { public: static inline php_coro_task* get_task() { php_coro_task *task = (php_coro_task *) Coroutine::get_current_task(); return task ? task : &main_task; } }
//全局协程map std::unordered_mapCoroutine::coroutines; class Coroutine { protected: Coroutine(coroutine_func_t fn, void *private_data) : ctx(stack_size, fn, private_data) { cid = ++last_cid; coroutines[cid] = this; } inline long run() { long cid = this->cid; origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } return cid; } }
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) : fn_(fn), stack_size_(stack_size), private_data_(private_data) { …… ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); }
void Context::context_func(void *arg) { Context *_this = (Context *) arg; _this->fn_(_this->private_data_); _this->end = true; _this->SwapOut(); }
问题:参数arg为什么是Context对象呢,是如何传递的呢?这就涉及到jump_fcontext汇编实现,以及jump_fcontext的调用了
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true); jump_fcontext: movq %rdx, %rdi
调用jump_fcontext函数时,第三个参数传递的是this,即当前Context对象;而函数jump_fcontext汇编实现时,将第三个参数的内容拷贝到%rdi寄存器中,当协程换入执行函数context_func时,寄存器%rdi存储的就是第一个参数,即Context对象。
在Swoole模块初始化时,会调用函数swoole_coroutine_util_init(该函数同时声明了"Co"等短名称),该函数进一步的调用PHPCoroutine::init()方法,该方法完成了静态属性的赋值操作。
void PHPCoroutine::init() { Coroutine::set_on_yield(on_yield); Coroutine::set_on_resume(on_resume); Coroutine::set_on_close(on_close); }
用户可以通过Co::yield()和Co::resume()实现协程的让出和恢复,
Co::yield()的底层实现函数为PHP_METHOD(swoole_coroutine_util, yield),Co::resume()的底层实现函数为PHP_METHOD(swoole_coroutine_util, resume)。本节将为读者讲述协程切换的实现原理。
static unordered_mapuser_yield_coros; static PHP_METHOD(swoole_coroutine_util, yield) { Coroutine* co = Coroutine::get_current_safe(); user_yield_coros[co->get_cid()] = co; co->yield(); RETURN_TRUE; } static PHP_METHOD(swoole_coroutine_util, resume) { …… auto coroutine_iterator = user_yield_coros.find(cid); if (coroutine_iterator == user_yield_coros.end()) { swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation"); RETURN_FALSE; } user_yield_coros.erase(cid); co->resume(); }
void Coroutine::yield() { state = SW_CORO_WAITING; if (on_yield) { on_yield(task); } current = origin; ctx.SwapOut(); }
void Coroutine::resume() { state = SW_CORO_RUNNING; if (on_resume) { on_resume(task); } origin = current; current = this; ctx.SwapIn(); if (ctx.end) { close(); } }
typedef enum { SW_CORO_INIT = 0, SW_CORO_WAITING, SW_CORO_RUNNING, SW_CORO_END, } sw_coro_state;
协程切换过程比较简单,这里不做过多详述。
当我们调用Co::sleep()让协程休眠时,会换出当前协程;或者调用CoroutineSocket->recv()从socket接收数据,但socket数据还没有准备好时,会阻塞当前协程,从而使得协程换出。那么问题来了,什么时候再换入执行这个协程呢?
Swoole的socket读写使用的成熟的IO多路复用模型:epoll/kqueue/select/poll等,并且将其封装在结构体_swReactor中,其定义如下:
struct _swReactor { //超时时间 int32_t timeout_msec; //fd的读写事件处理函数 swReactor_handle handle[SW_MAX_FDTYPE]; swReactor_handle write_handle[SW_MAX_FDTYPE]; swReactor_handle error_handle[SW_MAX_FDTYPE]; //fd事件的注册修改删除以及wait //函数指针,(以epoll为例)指向的是epoll_ctl、epoll_wait int (*add)(swReactor *, int fd, int fdtype); int (*set)(swReactor *, int fd, int fdtype); int (*del)(swReactor *, int fd); int (*wait)(swReactor *, struct timeval *); void (*free)(swReactor *); //超时回调函数,结束、开始回调函数 void (*onTimeout)(swReactor *); void (*onFinish)(swReactor *); void (*onBegin)(swReactor *); }
在调用函数PHPCoroutine::create创建协程时,会校验是否已经初始化_swReactor对象,如果没有则会调用php_swoole_reactor_init函数创建并初始化main_reactor对象;
void php_swoole_reactor_init() { if (SwooleG.main_reactor == NULL) { SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor)); if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) <0) { } …… php_swoole_register_shutdown_function_prepend("swoole_event_wait"); } }
我们以epoll为例,main_reactor各回调函数如下:
reactor->OnFinish= swReactor_onFinish; reactor->OnTimeout= swReactor_onTimeout; reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free;
注意:这里注册了一个函数swoole_event_wait,在生命周期register_shutdown阶段会执行该函数,开始Swoole的事件循环,阻挡了php生命周期的结束。
类Socket封装了socket读写相关的所有操作以及数据结构,其定义如下:
class Socket { public: swConnection *socket = nullptr; //读写函数 ssize_t recv(void *__buf, size_t __n); ssize_t send(const void *__buf, size_t __n); …… private: swReactor *reactor = nullptr; Coroutine *read_co = nullptr; Coroutine *write_co = nullptr; //连接超时时间,接收数据、发送数据超时时间 double connect_timeout = default_connect_timeout; double read_timeout = default_read_timeout; double write_timeout = default_write_timeout; }
void Socket::init_sock(int _fd) { reactor = SwooleG.main_reactor; //设置协程类型fd(SW_FD_CORO_SOCKET)的读写事件处理函数 if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) { reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, readable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, writable_event_callback); reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, error_event_callback); } }
当我们调用CoroutineSocket->recv接收数据时,底层实现如下:
Socket::timeout_setter ts(sock->socket, timeout, SW_TIMEOUT_READ); ssize_t bytes = all ? sock->socket->recv_all(ZSTR_VAL(buf), length) : sock->socket->recv(ZSTR_VAL(buf), length);
类timeout_setter会设置socket的接收数据超时时间read_timeout为timeout。
函数socket->recv_all会循环读取数据,直到读取到指定长度的数据,或者底层返回等待标识阻塞当前协程:
ssize_t Socket::recv_all(void *__buf, size_t __n) { timer_controller timer(&read_timer, read_timeout, this, timer_callback); while (true) { do { retval = swConnection_recv(socket, (char *) __buf + total_bytes, __n - total_bytes, 0); } while (retval <0 && swConnection_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ)); if (unlikely(retval <= 0)) { break; } total_bytes += retval; if ((size_t) total_bytes == __n) { break; } } }
class timer_controller { public: bool start() { if (timeout > 0) { *timer_pp = swTimer_add(&SwooleG.timer, (long) (timeout * 1000), 0, data, callback); } } }
struct _swTimer { swHeap *heap; //最小堆 swHashMap *map; //map,定时器ID作为key //最早的定时任务触发时间 long _next_msec; //函数指针,指向swReactorTimer_set int (*set)(swTimer *timer, long exec_msec); //函数指针,指向swReactorTimer_free void (*free)(swTimer *timer); };
if (timer->_next_msec <0 || timer->_next_msec > _msec) { timer->set(timer, _msec); timer->_next_msec = _msec; } static int swReactorTimer_set(swTimer *timer, long exec_msec) { SwooleG.main_reactor->timeout_msec = exec_msec; return SW_OK; }
bool Socket::wait_event(const enum swEvent_type event, const void **__buf, size_t __n) { if (unlikely(!add_event(event))) { return false; } if (likely(event == SW_EVENT_READ)) { read_co = co; read_co->yield(); read_co = nullptr; } else // if (event == SW_EVENT_WRITE) { write_co = co; write_co->yield(); write_co = nullptr; } }
上面提到,创建协程时,注册了一个函数swoole_event_wait,在生命周期register_shutdown阶段会执行该函数,开始Swoole的事件循环,阻挡了php生命周期的结束。函数swoole_event_wait底层就是调用main_reactor->wait等待fd读写事件的产生;我们以epoll为例讲述事件循环的逻辑:
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { while (reactor->running > 0) { n = epoll_wait(epoll_fd, events, max_event_num, swReactor_get_timeout_msec(reactor)); if (n == 0) { if (reactor->onTimeout != NULL) { reactor->onTimeout(reactor); } SW_REACTOR_CONTINUE; } for (i = 0; iremoved) { handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type); ret = handle(reactor, &event); } if ((events[i].events & EPOLLOUT) && !event.socket->removed) { handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type); ret = handle(reactor, &event); } } } }
int Socket::readable_event_callback(swReactor *reactor, swEvent *event) { Socket *socket = (Socket *) event->socket->object; socket->read_co->resume(); }
while ((tmp = swHeap_top(timer->heap))) { tnode = tmp->data; if (tnode->exec_msec > now_msec || tnode->round == timer->round) { break; } timer->_current_id = tnode->id; if (!tnode->remove) { tnode->callback(timer, tnode); } …… } //该定时任务没有超时,需要更新需要更新_swTimer中最早的定时任务触发时间_next_msec long next_msec = tnode->exec_msec - now_msec; if (next_msec <= 0) { next_msec = 1; } //同时更新main_reactor对象的超时时间,实现函数为swReactorTimer_set timer->set(timer, next_msec);
void Socket::timer_callback(swTimer *timer, swTimer_node *tnode) { Socket *socket = (Socket *) tnode->data; socket->set_err(ETIMEDOUT); if (likely(tnode == socket->read_timer)) { socket->read_timer = nullptr; socket->read_co->resume(); } else if (tnode == socket->write_timer) { socket->write_timer = nullptr; socket->write_co->resume(); } }
Co::sleep()的实现函数为PHP_METHOD(swoole_coroutine_util, sleep),该函数通过调用Coroutine::sleep实现了协程休眠的功能:
int Coroutine::sleep(double sec) { Coroutine* co = Coroutine::get_current_safe(); if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL) { return -1; } co->yield(); return 0; }
可以看到,与socket读写事件超时处理相同,sleep内部实现时通过swTimer_add添加定时任务,同时换出当前协程实现的。该定时任务会导致main_reactor对象的超时时间的改变,即修改了epoll_wait的超时时间。
sleep的超时处理函数为sleep_timeout,只需要换入该阻塞协程对象即可,实现如下:
static void sleep_timeout(swTimer *timer, swTimer_node *tnode) { ((Coroutine *) tnode->data)->resume(); }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 我们