- 本文件假设你有c++和多进程编程的基础知识。
- swoole进程间通信可以使用套接字(swoole_process::write/ swoole_process::read),也可以使用消息队列(push/pop)。本文的只讲述套接字通信部分。
- 本文使用的swoole源码为1.9版本
1. swoole_process中的__construct和start究竟做了什么
为了说明swoole进程间是如何使用unix socket进行通信的,我们先从源码入手,看看__construct和start函数究竟做了些什么。对于源码,我们只选取和本问题相关的部分进行解读。
1.1 __construct
swoole_process.c
static PHP_METHOD(swoole_process, __construct)
{
long pipe_type = 2;
if (pipe_type > 0)
{
swPipe *_pipe = emalloc(sizeof(swWorker));
int socket_type = pipe_type == 1 ? SOCK_STREAM : ,SOCK_DGRAM;
if (swPipeUnsock_create(_pipe, 1, socket_type) <0)
{
RETURN_FALSE;
}
process->pipe_object = _pipe;
process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);
process->pipe = process->pipe_master;
zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC)
}
swPipeUnsock_create做了什么呢?
pipe/PipeUnsock.c
int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{
p->blocking = blocking;
ret = socketpair(AF_UNIX, protocol, 0, object->socks);
if (ret <0)
{
swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
return SW_ERR;
}
else
{
int sbsize = SwooleG.socket_buffer_size;
swSocket_set_buffer_size(object->socks[0], sbsize);
swSocket_set_buffer_size(object->socks[1], sbsize);
}
return 0;
}
1.2 start
swoole_process.c
static PHP_METHOD(swoole_process, start)
{
swWorker *process = swoole_get_object(getThis());
pid_t pid = fork();
if (pid < 0)
{
swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
else if (pid > 0)
{
process->pid = pid;
process->child_process = 0;
zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
RETURN_LONG(pid);
}
else
{
process->child_process = 1;
SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
}
RETURN_TRUE;
}
子进程逻辑在php_swoole_process_start()中执行,我们继续看
swoole_process.c
int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
{
process->pipe = process->pipe_worker;
process->pid = getpid();
zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);
}
可见
__construt的主要工作是使用socketpair创建一对套接字,并指定主进程中的swoole_process对象用于读写的套接字。
start的主要工作是,创建子进程,设置子进程中的swoole_process对象用于读写的套接字。
2. 通信原理
2.1 read/write源码解读
swoole_process.c
static PHP_METHOD(swoole_process, read)
{
long buf_size = 8192;
if (buf_size > 65536)
{
buf_size = 65536;
}
swWorker *process = swoole_get_object(getThis());
char *buf = emalloc(buf_size + 1);
int ret = read(process->pipe, buf, buf_size);;
SW_ZVAL_STRINGL(return_value, buf, ret, 0);
}
swoole_process.c
static PHP_METHOD(swoole_process, write)
{
int ret;
/*以下两种情况的本质都是调用write函数向process-pipe中写入数据*/
//async write
if (SwooleG.main_reactor)
{
ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
}
else
{
ret = swSocket_write_blocking(process->pipe, data, data_len);
}
ZVAL_LONG(return_value, ret);
}
2.2 通信原理总结
swoole进程间使用套接字通信的原理如下:
1. 父进程使用socketpair创建一对套接字
2. 创建子进程时,子进程继承了这对套接字
3. 父子进程使用系统的read,write函数对各自的套接字进行读写完成通信。
4. 对于多个子进程,父进程其实是为每个子进程创建一对套接字用于通信。
5. 子进程之间的通信,比如A向B发消息,本质是fork A进程时,A从父进程处继承了向B发消息的套接字,从而完成了向B的通信。
3.关于SOCK_STREAM与SOCK_DGRAM
3.1 手册中的一个错误
手册中说:默认的方式是流式。但从1.1节的__construct源码中我们可以看到,默认使用的是SOCK_DGRAM方式
3.2 SOCK_STREAM与SOCK_DGRAM的区别
此参数经__construct的第三参数传入,最终作用于socketpair的protocol字段
ret = socketpair(AF_UNIX, protocol, 0, object->socks);
在通常意义来说SOCK_STREAM与SOCK_DGRAM分别用于tcp通信和udp通信,前者有序(先发先至),可靠;后者不保证顺序及数据可靠性。但在本地套接字中,由于是本机两进程通信,不会涉及数据丢失,乱序等问题。那么这两个参数的区别在哪呢?
下面是我看到的一个非常清晰明了的解释:
The difference between SOCK_STREAM and SOCK_DGRAM is in the semantics of consuming data out of the socket.
Stream socket allows for reading arbitrary number of bytes, but still preserving byte sequence. In other words, a sender might write 4K of data to the socket, and the receiver can consume that data byte by byte. The other way around is true too - sender can write several small messages to the socket that the receiver can consume in one read. Stream socket does not preserve message boundaries.
Datagram socket, on the other hand, does preserve these boundaries - one write by the sender always corresponds to one read by the receiver (even if receiver’s buffer given to read(2) or recv(2) is smaller then that message).
也就是说SOCK_STREAM是流式的,数据没有消息边界,发送方多次写入的数据,可能读取方一次就读取了。发送一次写入的数据,读取方可能分多次才读完。回到swoole意味着这种方式下,write与read的次数并不是一一对应的。你需要自己设置边界来切分消息。
SOCK_DGRAM方式,数据天然是有边界的,读写次数一定是一一对应的。回到swoole,意味着这种方式下,只要你的单条消息不超单次读写上限(默认8192字节),就不需要自行设置边界来切分消息。
看一个例子
$process1 = new swoole_process(function($process){
$i = 1;
while (true) {
$msg = $process->read();
echo $msg,"\n";
echo "read $i time\n";
$i++;
}
}, false, 1);
$num = 10;
$process2 = new swoole_process(function($process) use ($process1, $num){
for ($i=0; $i<$num; $i++){
$msg = $process1->write("hello 1 i'm 2;");
if ($i % 5 == 0){
sleep(1);
}
}
});
$process2->start();
$process1->start();
new第三参数设为1,使用SOCK_STREAM通信。运行结果如下:
hello 1 i'm 2;
read 1 time
hello 1 i'm 2;hello 1 i'm 2;hello 1 i'm 2;
read 2 time
hello 1 i'm 2;hello 1 i'm 2;
read 3 time
hello 1 i'm 2;hello 1 i'm 2;
read 4 time
hello 1 i'm 2;hello 1 i'm 2;
read 5 time
process2向process1写了10次数据,process1用了5次读完
将new第三参数设为2,使用SOCK_DGRAM通信。运行结果如下:
hello 1 i'm 2;
read 1 time
hello 1 i'm 2;
read 2 time
hello 1 i'm 2;
read 3 time
hello 1 i'm 2;
read 4 time
hello 1 i'm 2;
read 5 time
hello 1 i'm 2;
read 6 time
hello 1 i'm 2;
read 7 time
hello 1 i'm 2;
read 8 time
hello 1 i'm 2;
read 9 time
hello 1 i'm 2;
read 10 time
10次写对应10次读。