热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PHP多进程消费队列

原标题:PHP多进程消费队列引言最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了

原标题:PHP多进程消费队列

引言

最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理不过来了......

这种方式每次都要修改crontab,如果进程挂掉了,不会及时的启动,要等到下次crontab执行的时候才会启动。关闭(重启)进程的时候用的是kill,这可能会丢失正在处理的数据,比如下面这个例子,我们假设sleep过程就是处理逻辑,这里为了明显看出效果,将处理时间放大到10s:

$i = 1;
while (1) {
echo "开始第[{$i}]次循环\n";
sleep(10);
echo "结束第[{$i}]次循环\n";
$i++;
}

当我们运行脚本之后,等到循环开始之后,给进程发送kill {$pid},默认发送的是编号为15的SIGTERM信号。假设$i是从队列拿到的,拿到2的时候,正在处理,我们给程序发送了kill信号,和队列数据丢失一样,问题比较大,因此我要想办法解决这些问题。

开始第[1]次循环
结束第[1]次循环
开始第[2]次循环
[1] 28372 terminated php t.php

nginx进程模型

这时候我想到了nginx,nginx作为高性能服务器的中流砥柱,为成千上万的企业和个人服务,他的进程模型比较经典,如下所示:

nginx 进程模型 图片来自网络

管理员通过master进程和nginx进行交互,从/path/to/nginx.pid读取nginx master进程的pid,发送信号给master进程,master根据不同的信号做出不同的处理,然后反馈信息给管理员。worker是master进程fork出来的,master负责管理worker,不会去处理业务,worker才是具体业务的处理者,master可以控制w文章来源地址24825.htmlorker的退出、启动,当worker意外退出,master会收到子进程退出的消息,也会重新启www.yii666.com动新的worker进程补充上来,不让业务处理受影响。nginx还可以平滑退出,不丢失任何一个正在处理的数据,更新配置时nginx可以做到不影响线上服务来加载新的配置,这在请求量很大的时候特别有用。

进程设计

看了nginx的进模型,我们完全可以开发一个类似的类库来满足处理mcq数据的需求,做到单文件控制所有进程、可以平滑退出、可以查看子进程状态。不需要太复杂,因为我们处理队列数据接收一定的延迟,做到nginx那样不间断服务比较麻烦,费时费力,意义不是很大。设计的进程模型跟nginx类似,更像是nginx的简化版本。
多进程模型

进程信号量设计

信号量是进程间通讯的一种方式,比较简单,单功能也比较弱,只能发送信号给进程,进程根据信号做出不同的处理。

master进程启动的时候保存pid到文件/path/to/daeminze.pid,管理员通过信号和master进程通讯,master进程安装3种信号,碰到不同的信号,做出不同的处理,如下所示:

SIGINT => 平滑退出,处理完正在处理的数据再退出
SIGTERM => 暴力退出,无论进程是否正在处理数据直接退出
SIGUSR1 => 查看进程状态,查看进程占用内存,运行时间等信息

master进程通过信号和worker进程通讯,worker进程安装了2个信号,如下所示:

SIGINT => 平滑退出
SIGUSR1 => 查看worker进程自身状态

为什么worker进程只安装2个信号呢,少了个SIGTERM,因为master进程收到信号SIGTERM之后,向worker进程发送SIGKILL信号,默认强制关闭进程即可。

worker进程是通过master进程fork出来的,这样master进程可以通过pcntl_wait来等待子进程退出事件,当有子进程退出的时候返回子进程pid,做处理并启动新的进程补充上来。

master进程也通过pcntl_wait来等待接收信号,当有信号到达的时候,会返回-1,这个地方还有些坑,在下文中会详细讲。

PHP中有2种信号触发的方式,第一种方式是declare(ticks = 1);,这种效率不高,Zend每执行一次低级语句,都会去检查进程中是否有未处理的信号,现在已经很少使用了,PHP 5.3.0及之前的版本可能会用到这个。

第二种是通过pcntl_signal_dispatch来调用未处理的信号,PHP 5.4.0及之后的版本适用,可以巧妙的将该函数放在循环中,性能上基本没什么损失,现在推荐适用。

PHP安装修信号量

PHP通过pcntl_signal安装信号,函数声明如下所示:

bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )

第三个参数restart_syscalls不太好理解,找了很多资料,也没太查明白,经过试验发现,这个参数对pcntl_wait函数接收信号有影响,当设置为缺省值true的时候,发送信号,进程用pcntl_wait收不到,必须设置为false才可以,看看下面这个例子:

$i = 0;
while ($i<5) {
$pid = pcntl_fork();
$random = rand(10, 50);
if ($pid == 0) {
sleep($random);
exit();
}
echo "child {$pid} sleep {$random}\n";
$i++;
}
pcntl_signal(SIGINT, function($signo) {
echo "Ctrl + C\n";
});
while (1) {
$pid = pcntl_wait($status);
var_dump($pid);
pcntl_signal_dispatch();
}

运行之后,我们对父进程发送kill -SIGINT {$pid}信号,发现pcntl_wait没有反应,等到有子进程退出的时候,发送过的SIGINT会一个个执行,比如下面结果:

child 29643 sleep 48
child 29644 sleep 24
child 29645 sleep 37
child 29646 sleep 20
child 29647 sleep 31
int(29643)
Ctrl + C
Ctrl + C
Ctrl + C
Ctrl + C
int(29646)

这是运行脚本之后马上给父进程发送了四次SIGINT信号,等到一个子进程推出的时候,所有信号都会触发。

但当把安装信号的第三个参数设置为false

pcntl_signal(SIGINT, function($signo) {
echo "Ctrl + C\n";
}, false);

这时候给父进程发送SIGINT信号,pcntl_wait会马上返回-1,信号对应的事件也会触发。

所以第三个参数大概意思就是,是否重新注册此信号,如果为false只注册一次,触发之后就返回,pcntl_wait就能收到消息,如果为true,会重复注册,不会返回,pcntl_wait收不到消息。

信号量和系统调用

信号量会打断系统调用,让系统调用立刻返回,比如sleep,当进程正在sleep的时候,收到信号,sleep会马上返回剩余sleep秒数,比如:

pcntl_signal(SIGINT, function($signo) {
echo "Ctrl + C\n";
}, false);
while (true) {
pcntl_signal_dispatch();
echo "123\n";
$limit = sleep(2);
echo "limit sleep [{$limit}] s\n";
}

运行之后,按Ctrl + C,结果如下所示:

123
^Climit sleep [1] s
Ctrl + C
123
limit sleep [0] s
123
^Climit sleep [1] s
Ctrl + C
123
^Climi文章来源地址24825.htmlt sleep [2] s

daemon(守护)进程

这种进程一般设计为daemon进程,不受终端控制,不与终端交互,长时间运行在后台,而对于一个进程,我们可以通过下面几个步骤把他升级为一个标准的daemon进程:

protected function daemonize()
{
$pid = pcntl_fork();
if (-1 == $pid) {
throw new Exception("fork进程失败");
} elseif ($pid != 0) {
exit(0);
}
if (-1 == posix_setsid()) {
throw new Exception("新建立session会话失败");
}
$pid = pcntl_fork();
if (-1 == $pid) {
throw new Exception("fork进程失败");
} else if($pid != 0) {
exit(0);
}
umask(0);
chdir("/");
}

拢共分五步:


  1. fork子进程,父进程退出。

  2. 设置子进程为会话组长,进程组长。

  3. 再次fork,父进程退出,子进程继续运行。

  4. 恢复文件掩码为0

  5. 切换当前目录到根目录/

第2步是为第1步做准备,设置进程为会话组长,必要条件是进程非进程组长,因此做第一次fork,进程组长(父进程)退出,子进程通过posix_setsid()设置为会话组长,同时也为进程组长。

第3步是为了不让进程重新控制终端,因为一个进程控制一个终端的必要条件是会话组长(pid=sid)。

第4步是为了恢复默认的文件掩码,避免之前做的操作对文件掩码做了设置,带来不必要的麻烦。关于文件掩码, linux中,文件掩码在创建文件、文件夹的时候会用到,文件的默认权限为666,文件夹为777,创建文件(夹)的时候会用默认值减去掩码的值作为创建文件(夹)的最终值,比如掩码022下创建文件666 - 222 = 644,创建文件夹777 - 022 = 755



















掩码新建文件权限新建文件夹权限
umask(0)666 (-rw-rw-rw-)777 (drwxrwxrwx)
umask(022)644 (-rw-r--r--)755 (drwxr-xr-x)

第5步是切换了当前目录文章来源站点https://www.yii666.com/到根目录/,网上说避免起始运行他的目录不能被正确卸载,这个不是太了解。

对应5步,每一步的各种id变化信息:







































操作后pidppidpgidsid
开始17723313811772331381
第一次fork1772311772331381
posix_setsid()1774011774017740
第二次fork1784011774017740

另外,会话、进程组、进程的关系如下图所示,这张图有助于更好的理解。
会话、进程组、进程关系

至此,你也可以轻松地造出一个daemon进程了。

命令设计

我准备给这个类库设计6个命令,如下所示:


  1. start 启动命令

  2. restart 强制重启

  3. stop 平滑停止

  4. reload 平滑重启

  5. quit 强制停止

  6. status 查看进程状态


启动命令

启动命令就是默认的流程,按照默认流程走就是启动命令,启动命令会检测pid文件中是否已经有pid,pid对应的进程是否健康,是否需要重新启动。

强制停止命令

管理员通过入口文件结合pid给master进程发送SIGTERM信号,master进程给所有子进程发送SIGKILL信号,等待所有worker进程退出后,master进程也退出。

强制重启命令

强制停止命令+启动命令

平滑停止命令

平滑停止命令,管理员给master进程发送SIGINT信号,master进程给所有子进程发送SIGINT,worker进程将自身状态标记为stoping,当worker进程下次循环的时候会根据stoping决定停止,不在接收新的数据,等所有worker进程退出之后,master进程也退出。

平滑重启命令

平滑停止命令+启动命令

查看进程状态

查看进程状态这个借鉴了workerman的思路,管理员给master进程发送SIGUSR1信号,告诉主进程,我要看所有进程的信息,master进程,master进程将自身的进程信息写入配置好的文件路径A中,然后发送SIGUSR1,告诉worker进程把自己的信息也写入文件A中,由于这个过程是异步的,不知道worker进程啥时候写完,所以master进程在此处等待,等所有worker进程都写入文件之后,格式化所有的信息输出,最后输出的内容如下所示:

➜/dir /usr/local/bin/php DaemonMcn.php status
Daemon [DaemonMcn] 信息:
-------------------------------- master进程状态 --------------------------------
pid 占用内存 www.yii666.com 处理次数 开始时间 运行时间
16343 0.75M -- 2018-05-15 09:42:45 0 0 3
12 slaver
-------------------------------- slaver进程状态 --------------------------------
任务task-mcq:
16345 0.75M 236 2018-05-15 09:42:45 0 0 3
16346 0.75M 236 2018-05-15 09:42:45 0 0 3
--------------------------------------------------------------------------------
任务test-mcq:
16348 0.75M 49 2018-05-15 09:42:45 0 0 3
16350 0.75M 49 2018-05-15 09:42:45 0 0 3
16358 0.75M 49 2018-05-15 09:42:45 0 0 3
16449 0.75M 1 2018-05-15 09:46:40 0 0 0
--------------------------------------------------------------------------------

等待worker进程将进程信息写入文件的时候,这个地方用了个比较trick的方法,每个worker进程输出一行信息,统计文件的行数,达到worker进程的行数之后表示所有worker进程都将信息写入完毕,否则,每个1s检测一次。

来源于:PHP多进程消费队列


推荐阅读
  • 现在需要用到php(现在可以用)
    本文目录一览:1、现在在工作中PHP用到的多么? ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 之前项目在windows2003服务器上设置定时任务,每天执行。现在把项目移植到linux系统。也要在linux系统上设置计划任务。但是之前我从来没有做过。所以多得不 ... [详细]
  • 本文转载自:https:blog.csdn.netu924512005articledetails70655272详细了解FPGAselectIO是学习FPGA基础 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 本文介绍了如何将CIM_DateTime解析为.Net DateTime,并分享了解析过程中可能遇到的问题和解决方法。通过使用DateTime.ParseExact方法和适当的格式字符串,可以成功解析CIM_DateTime字符串。同时还提供了关于WMI和字符串格式的相关信息。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
author-avatar
一千万223
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有