热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

swoole启动流程_Swoole+Redis实现并发队列处理

背景由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务

背景

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

具体的工作流程如下:

1、主系统将需要需要处理的任务名称+任务参数push到队列中。

2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

具体代码如下:

/*** 启动守护进程*/
public function runAction() {Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');while (true) {$this->fork_process();}exit;
}/*** 创建子进程*/
private function fork_process() {$ppid = getmypid();$pid = pcntl_fork();if ($pid == 0) {//子进程$pid = posix_getpid();//echo "* Process {$pid} was created nn";$this->mq_process();exit;} else {//主进程$pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态if (pcntl_wifexited($status)) {//echo "nn* Sub process: {$pid} exited with {$status}";//Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );} else {Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');}}
}/*** 业务任务队列处理*/
private function mq_process() {$data_pop = $this->masterRedis->rPop($this->redis_list_key);$data = json_decode($data_pop, 1);if (!$data) {return FALSE;}$worker = '_task_' . $data['worker'];$class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';$params = $data['params'];$class = new $class_name();$class->$worker($params);return TRUE;
}

这是一个简单的任务处理系统。通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过,这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!第一个问题还好,但第二个问题就很严重。当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

新的设计

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在PHP7之前不支持多线程,所以我们采用多进程。

找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。

显然这是不合适的。

我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

遇到的问题

1、如何控制最大进程数

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行Google,这里直接看代码。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

declare是一个控制结构语句,具体的用法也请去Google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

2、如何解决进程残留

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于Redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

新系统的评估

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用Swoole扩展中的Process。

具体代码如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{use Trait_Redis;private $maxProcesses &#61; 800;private $child;private $masterRedis;private $redis_task_wing &#61; &#39;task:wing&#39;; //待处理队列public function init(){// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));set_time_limit(0);ini_set(&#39;default_socket_timeout&#39;, -1); //队列处理不超时,解决redis报错:read error on connection}private function redis_client(){$rds &#61; new Redis();$rds->connect(&#39;redis.master.host&#39;,6379);return $rds;}public function process(swoole_process $worker){// 第一个处理$GLOBALS[&#39;worker&#39;] &#61; $worker;swoole_event_add($worker->pipe, function($pipe) {$worker &#61; $GLOBALS[&#39;worker&#39;];$recv &#61; $worker->read(); //send data to mastersleep(rand(1, 3));echo "From Master: $recvn";$worker->exit(0);});exit;}public function testAction(){for ($i &#61; 0; $i <10000; $i&#43;&#43;){$data &#61; [&#39;abc&#39; &#61;> $i,&#39;timestamp&#39; &#61;> time().rand(100,999)];$this->masterRedis->lpush($this->redis_task_wing, json_encode($data));}exit;}public function runAction(){while (1){
// echo "t now we de have $this->child child processesn";if ($this->child <$this->maxProcesses){$rds &#61; $this->redis_client();$data_pop &#61; $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待if (!$data_pop){continue;}echo "t Starting new child | now we de have $this->child child processesn";$this->child&#43;&#43;;$process &#61; new swoole_process([$this, &#39;process&#39;]);$process->write(json_encode($data_pop));$pid &#61; $process->start();}}}private function sig_handler($signo) {
// echo "Recive: $signo rn";switch ($signo) {case SIGCHLD:while($ret &#61; swoole_process::wait(false)) {
// echo "PID&#61;{$ret[&#39;pid&#39;]}n";$this->child--;}}}
}

最终&#xff0c;经过测试&#xff0c;单核1G的服务器执行1到3秒的任务可以做到800的并发。

如果你喜欢我写的技术文章以及面试总结&#xff0c;欢迎关注收看我的视频&#xff0c;并且点赞、收藏、关注我哦。

我是luke&#xff0c;感谢你的关注&#xff01;

很多小伙伴在进阶的时候总会遇到一些问题和瓶颈&#xff0c;业务代码写多了没有方向感&#xff0c;不知道该从那里入手去提升&#xff0c;对此我整理了一些资料&#xff0c;希望能够去帮助到小伙伴们&#xff0c;可以关注我。并且加入到我的圈子一起学习成长哦【架构师之路】点击链接申请加入圈子

架构师之路 - 知乎​www.zhihu.com
4435c926c25d76e63f83bdd4f7061997.png



推荐阅读
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 如何查询zone下的表的信息
    本文介绍了如何通过TcaplusDB知识库查询zone下的表的信息。包括请求地址、GET请求参数说明、返回参数说明等内容。通过curl方法发起请求,并提供了请求示例。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 本文介绍了如何使用JSONObiect和Gson相关方法实现json数据与kotlin对象的相互转换。首先解释了JSON的概念和数据格式,然后详细介绍了相关API,包括JSONObject和Gson的使用方法。接着讲解了如何将json格式的字符串转换为kotlin对象或List,以及如何将kotlin对象转换为json字符串。最后提到了使用Map封装json对象的特殊情况。文章还对JSON和XML进行了比较,指出了JSON的优势和缺点。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 本文介绍了如何使用jQuery和AJAX来实现动态更新两个div的方法。通过调用PHP文件并返回JSON字符串,可以将不同的文本分别插入到两个div中,从而实现页面的动态更新。 ... [详细]
  • express工程中的json调用方法
    本文介绍了在express工程中如何调用json数据,包括建立app.js文件、创建数据接口以及获取全部数据和typeid为1的数据的方法。 ... [详细]
  • 【宇润日常疯测007】Swoole 协程与传统 fpm 同步模式比较
    为什么80%的码农都做不了架构师?如果说数组是PHP的精髓,数组玩得不6的,根本不能算是会用PHP。那协程对于Swoole也是同理& ... [详细]
  • PHP socket服务端与客户端的简易通信
    今天学习socket通信的同时,顺便整理了下以前初识socket的知识。现在关于php的socket通信,有些框架已经十分成熟了,比如swoole和workerman,这两个大家可以学习学 ... [详细]
author-avatar
fuxw
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有