热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PHP并发编程之MasterWorker模式

Master-Worker的模式结构Master进程为主进程,它维护了一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程,不停地从任务队列中提






Master-Worker 的模式结构

Master 进程为主进程,它维护了一个 Worker 进程队列、子任务队列和子结果集。Worker 进程队列中的 Worker 进程,不停地从任务队列中提取要处理的子任务,并将子任务的处理结果写入结果集。


  • 使用多进程

  • 支持 Worker 错误重试,仅仅实现业务即可

  • 任务累积过多,自动 Fork Worker 进程

  • 常驻 Worker 进程,减少进程 Fork 开销

  • 非常驻 Worker 进程闲置,自动退出回收

  • 支持日志

Demo: 基于 Redis 生产消费队列 在 test 目录中

代码:

declare(ticks = 1);
// 必须先使用语句declare(ticks=1),否则注册的singal-handel就不会执行了
//error_reporting(E_ERROR);
abstract class MasterWorker
{
// 子进程配置属性
protected $maxWorkerNum; // 最多只能开启进程数
protected $minWorkerNum; // 最少常驻子进程数
protected $waitTaskTime; // 等待任务时间,单位秒
protected $waitTaskLoopTimes; // 连续这么多次队列为空就退出子进程
protected $consumeTryTimes; // 连续消费失败次数
// 父进程专用属性
protected $worker_list = [];
protected $check_internal = 1;
protected $masterExitCallback = [];
// 子进程专用属性
protected $autoQuit = false;
protected $status = self::WORKER_STATUS_IDLE;
protected $taskData; // 任务数据
protected $workerExitCallback = [];
// 通用属性
protected $stop_service = false;
protected $master = true;
// 通用配置
protected $logFile;
const WORKER_STATUS_IDLE = 'idle';
const WORKER_STATUS_FINISHED = 'finished';
const WORKER_STATUS_EXITING = 'exiting';
const WORKER_STATUS_WORKING = 'working';
const WORKER_STATUS_FAIL = 'fail';
const WORKER_STATUS_TERMINATED = 'terminated';
public function __construct($optiOns= [])
{
$this->initConfig($options);
}
protected function initConfig($optiOns= [])
{
$defaultCOnfig= [
'maxWorkerNum' => 10,
'minWorkerNum' => 3,
'waitTaskTime' => 0.01,
'waitTaskLoopTimes' => 50,
'consumeTryTimes' => 3,
'logFile' => './master_worker.log',
];
foreach ($defaultConfig as $key => $default) {
$this->$key = array_key_exists($key, $options) ? $options[$key] : $default;
}
}
public function start()
{
// 父进程异常,需要终止子进程
set_exception_handler([$this, 'exceptionHandler']);
// fork minWorkerNum 个子进程
$this->mutiForkWorker($this->minWorkerNum);
if ($this->getWorkerLength() <= 0) {
$this->masterWaitExit(true, 'fork 子进程全部失败');
}
// 父进程监听信号
pcntl_signal(SIGTERM, [$this, 'sig_handler']);
pcntl_signal(SIGINT, [$this, 'sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'sig_handler']);
pcntl_signal(SIGCHLD, [$this, 'sig_handler']);
// 监听队列,队列比进程数多很多,则扩大进程,扩大部分的进程会空闲自动退出
$this->checkWorkerLength();
$this->masterWaitExit();
}
/**
* Master 等待退出
*
* @param boolean $force 强制退出
* @param string $msg 退出 message
* @return void
*/
protected function masterWaitExit($force = false, $msg = '')
{
// 强制发送退出信号
$force && $this->sig_handler(SIGTERM);
// 等到子进程退出
while ($this->stop_service) {
$this->checkExit($msg);
$this->msleep($this->check_internal);
}
}
protected function log($msg)
{
try {
$header = $this->isMaster() ? 'Master [permanent]' : sprintf('Worker [%s]', $this->autoQuit ? 'temporary' : 'permanent');
$this->writeLog($msg, $this->getLogFile(), $header);
} catch (\Exception $e) {
}
}
protected function mutiForkWorker($num, $autoQuit = false, $maxTryTimes = 3)
{
for ($i = 1; $i <= $num; ++$i) {
$this->forkWorker($autoQuit, $maxTryTimes);
}
}
protected function checkWorkerLength()
{
// 如果要退出父进程,就不执行检测
while (! $this->stop_service) {
$this->msleep($this->check_internal);
// 处理进程
$workerLength = $this->getWorkerLength();
// 如果进程数小于最低进程数
$this->mutiForkWorker($this->minWorkerNum - $workerLength);
$workerLength = $this->getWorkerLength();
// 创建常驻worker进程失败, 下次检查继续尝试创建
if ($workerLength <= 0) {
continue;
}
if ($workerLength >= $this->maxWorkerNum) {
// 不需要增加进程
continue;
}
$num = $this->calculateAddWorkerNum();
// 不允许超过最大进程数
$num = min($num, $this->maxWorkerNum - $workerLength);
// 创建空闲自动退出worker进程
$this->mutiForkWorker($num, true);
}
}
protected function getWorkerLength()
{
return count($this->worker_list);
}
//信号处理函数
public function sig_handler($sig)
{
switch ($sig) {
case SIGTERM:
case SIGINT:
case SIGQUIT:
// 退出: 给子进程发送退出信号,退出完成后自己退出
// 先标记一下,子进程完全退出后才能结束
$this->stop_service = true;
// 给子进程发送信号
foreach ($this->worker_list as $pid => $v) {
posix_kill($pid, SIGTERM);
}
break;
case SIGCHLD:
// 子进程退出, 回收子进程, 并且判断程序是否需要退出
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
// 去除子进程
unset($this->worker_list[$pid]);
// 子进程是否正常退出
// if (pcntl_wifexited($status)) {
// //
// }
}
$this->checkExit();
break;
default:
$this->default_sig_handler($sig);
break;
}
}
public function child_sig_handler($sig)
{
switch ($sig) {
case SIGINT:
case SIGQUIT:
case SIGTERM:
$this->stop_service = true;
break;
// 操作比较危险 在处理任务当初强制终止
// case SIGTERM:
// // 强制退出
// $this->stop_service = true;
// $this->status = self::WORKER_STATUS_TERMINATED;
// $this->beforeWorkerExitHandler();
// $this->status = self::WORKER_STATUS_EXITING;
// die(1);
// break;
}
}
protected function checkExit($msg = '')
{
if ($this->stop_service && empty($this->worker_list)) {
$this->beforeMasterExitHandler();
die($msg ?:'Master 进程结束, Worker 进程全部退出');
}
}
protected function forkWorker($autoQuit = false, $maxTryTimes = 3)
{
$times = 1;
do {
$pid = pcntl_fork();
if ($pid == -1) {
++$times;
} elseif($pid) {
$this->worker_list[$pid] = true;
//echo 'pid:', $pid, "\n";
return $pid;
} else {
// 子进程 消费
$this->autoQuit = $autoQuit;
$this->master = false;
// 处理信号
pcntl_signal(SIGTERM, [$this, 'child_sig_handler']);
pcntl_signal(SIGINT, [$this, 'child_sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'child_sig_handler']);
exit($this->runChild()); // worker进程结束
}
} while ($times <= $maxTryTimes);
// fork 3次都失败
return false;
}
/**
* 子进程处理内容
*/
protected function runChild()
{
$noDataLoopTime = 0;
$status = 0;
while (!$this->autoQuit || ($noDataLoopTime <= $this->waitTaskLoopTimes)) {
// 处理退出
if ($this->stop_service) {
break;
}
$this->taskData = null;
try {
$this->taskData = $this->deQueue();
if ($this->taskData) {
$noDataLoopTime = 1; // 重新从1开始
$this->status = self::WORKER_STATUS_WORKING;
$this->consumeByRetry($this->taskData);
$this->status = self::WORKER_STATUS_FINISHED;
} else {
$this->status = self::WORKER_STATUS_IDLE;
// 避免溢出
$noDataLoopTime = $noDataLoopTime >= PHP_INT_MAX ? PHP_INT_MAX : ($noDataLoopTime + 1);
// 等待队列
$this->msleep($this->waitTaskTime);
}
$status = 0;
} catch (\RedisException $e) {
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 1;
} catch (\Exception $e) {
// 消费出现错误
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 2;
}
}
$this->beforeWorkerExitHandler();
$this->status = self::WORKER_STATUS_EXITING;
return $status;
}
/**
* @param $data
* @param int $tryTimes
* @throws \Exception
*/
protected function consumeByRetry($data, $tryTimes = 1)
{
$tryTimes = 1;
$exception = null;
// consume 返回false 为失败
while ($tryTimes <= $this->consumeTryTimes) {
try {
return $this->consume($data);
} catch (\Exception $e) {
$exception = $e;
++$tryTimes;
}
}
// 最后一次还报错 写日志
if (($tryTimes > $this->consumeTryTimes) && $exception) {
throw $exception;
}
}
/**
* @param $mixed
* @param $filename
* @param $header
* @param bool $trace
* @return bool
* @throws \Exception
*/
protected function writeLog($mixed, $filename, $header, $trace = false)
{
if (is_string($mixed)) {
$text = $mixed;
} else {
$text = var_export($mixed, true);
}
$trace_list = "";
if ($trace) {
$_t = debug_backtrace();
$trace_list = "-- TRACE : \r\n";
foreach ($_t as $_line) {
$trace_list .= "-- " . $_line ['file'] . "[" . $_line ['line'] . "] : " . $_line ['function'] . "()" . "\r\n";
}
}
$text = "\r\n=" . $header . "==== " . strftime("[%Y-%m-%d %H:%M:%S] ") . " ===\r\n<" . getmypid() . "> : " . $text . "\r\n" . $trace_list;
$h = fopen($filename, 'a');
if (! $h) {
throw new \Exception('Could not open logfile:' . $filename);
}
// exclusive lock, will get released when the file is closed
if (! flock($h, LOCK_EX)) {
return false;
}
if (fwrite($h, $text) === false) {
throw new \Exception('Could not write to logfile:' . $filename);
}
flock($h, LOCK_UN);
fclose($h);
return true;
}
protected function msleep($time)
{
usleep($time * 1000000);
}
public function exceptionHandler($exception)
{
if ($this->isMaster()) {
$msg = '父进程['.posix_getpid().']错误退出中:' . $exception->getMessage();
$this->log($msg);
$this->masterWaitExit(true, $msg);
} else {
$this->child_sig_handler(SIGTERM);
}
}
public function isMaster()
{
return $this->master;
}
/**
* 默认的 worker 数量增加处理
*
* @return int
*/
public function calculateAddWorkerNum()
{
$workerLength = $this->getWorkerLength();
$taskLength = $this->getTaskLength();
// 还不够多
if (($taskLength / $workerLength <3) && ($taskLength - $workerLength <10)) {
return 0;
}
// 增加一定数量的进程
return ceil($this->maxWorkerNum - $workerLength / 2);
}
/**
* 自定义日子文件
*
* @return string
*/
protected function getLogFile()
{
return $this->logFile;
}
/**
* 自定义消费错误函数
*
* @param [type] $data
* @param \Exception $e
* @return void
*/
protected function consumeFail($data, \Exception $e)
{
$this->log(['data' => $data, 'errorCode' => $e->getCode(), 'errorMsg' => get_class($e) . ' : ' . $e->getMessage()]);
}
protected function beforeWorkerExitHandler()
{
foreach ($this->workerExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
/**
* 设置Worker自定义结束回调
*
* @param mixed $func
* @param boolean $prepend
* @return void
*/
public function setWorkerExitCallback($callback, $prepend = false)
{
return $this->setCallbackQueue('workerExitCallback', $callback, $prepend);
}
/**
* 设置Master自定义结束回调
*
* @param callable $func
* @param boolean $prepend
* @return void
*/
public function setMasterExitCallback(callable $callback, $prepend = false)
{
return $this->setCallbackQueue('masterExitCallback', $callback, $prepend);
}
protected function setCallbackQueue($queueName, $callback, $prepend = false)
{
if (! isset($this->$queueName) || ! is_array($this->$queueName)) {
return false;
}
if (is_null($callback)) {
$this->$queueName = []; // 如果传递 null 就清空
return true;
} elseif (! is_callable($callback)) {
return false;
}
if ($prepend) {
array_unshift($this->$queueName, $callback);
} else {
$this->$queueName[] = $callback;
}
return true;
}
protected function beforeMasterExitHandler()
{
foreach ($this->masterExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
protected function default_sig_handler($sig)
{
}
/**
* 得到待处理任务数量
*/
abstract protected function getTaskLength();
/**
* 出队
* @return mixed
*/
abstract public function deQueue();
/**
* 入队
* @param $data
* @return int
*/
abstract public function enQueue($data);
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
abstract protected function consume($data);
}

Demo

基于redis 的 生产者-消费者模式


RedisProducterConsumer.php


require "../src/MasterWorker.php";
class RedisProducterConsumer extends MasterWorker
{
const QUERY_NAME = 'query_name';
/**
* Master 和 Worker 的连接分开,否则会出现问题
*
* @var Redis[]
*/
protected $redis_cOnnections= [];
public function __construct($optiOns= [])
{
parent::__construct($options);
// 设置退出回调
$this->setWorkerExitCallback(function ($worker) {
$this->closeRedis();
// 处理结束,把redis关闭
$this->log('进程退出:' . posix_getpid());
});
$this->setMasterExitCallback(function ($master) {
$this->closeRedis();
$this->log('master 进程退出:' . posix_getpid());
});
}
/**
* 得到队列长度
*/
protected function getTaskLength()
{
return (int) $this->getRedis()->lSize(static::QUERY_NAME);
}
/**
* 出队
* @return mixed
*/
public function deQueue()
{
return $this->getRedis()->lPop(static::QUERY_NAME);
}
/**
* 入队
* @param $data
* @return int
*/
public function enQueue($data)
{
return $this->getRedis()->rPush(static::QUERY_NAME, (string) $data);
}
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
protected function consume($data)
{
// 错误抛出异常
//throw new Exception('错误信息');
$this->log('消费中 ' . $data);
$this->msleep(1);
$this->log('消费结束:' . $data . '; 剩余个数:' . $this->getTaskLength());
}
/**
* @return Redis
*/
public function getRedis()
{
$index = $this->isMaster() ? 'master' : 'worker';
// 后续使用 predis 使用redis池
if (! isset($this->redis_connections[$index])) {
$cOnnection= new \Redis();
$connection->connect('127.0.0.1', 6379, 2);
$this->redis_connections[$index] = $connection;
}
return $this->redis_connections[$index];
}
public function closeRedis()
{
foreach ($this->redis_connections as $key => $connection) {
$connection && $connection->close();
}
}
protected function consumeFail($data, \Exception $e)
{
parent::consumeFail($data, $e);
// 自定义操作,比如重新入队,上报错误等
}
}

调用例子

require "./RedisProducterConsumer.php";
$producterCOnsumer= new RedisProducterConsumer();
// 清空任务队列
$producterConsumer->getRedis()->ltrim(RedisProducterConsumer::QUERY_NAME, 1, 0);
// 写入任务队列
for ($i = 1; $i <= 100; ++$i) {
$producterConsumer->enQueue($i);
}
$producterConsumer->start();
// 接下来的写的代码不会执行
// 查看运行的进程
// ps aux | grep test.php
// 试一试 Ctrl + C 在执行上面产看进程命令

代码地址:https://github.com/MrSuperLi/php-master-wo...




php
并发编程
master-worker


推荐阅读
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 2018-2019学年第六周《Java数据结构与算法》学习总结
    本文总结了2018-2019学年第六周在《Java数据结构与算法》课程中的学习内容,重点介绍了非线性数据结构——树的相关知识及其应用。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入解析Serverless架构模式
    本文将详细介绍Serverless架构模式的核心概念、工作原理及其优势。通过对比传统架构,探讨Serverless如何简化应用开发与运维流程,并介绍当前主流的Serverless平台。 ... [详细]
  • 本文详细介绍了如何在Kendo UI for jQuery的数据管理组件中,将行标题字段呈现为锚点(即可点击链接),帮助开发人员更高效地实现这一功能。通过具体的代码示例和解释,即使是新手也能轻松掌握。 ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • 本文详细介绍了Java中实现异步调用的多种方式,包括线程创建、Future接口、CompletableFuture类以及Spring框架的@Async注解。通过代码示例和深入解析,帮助读者理解并掌握这些技术。 ... [详细]
  • 当unique验证运到图片上传时
    2019独角兽企业重金招聘Python工程师标准model:public$imageFile;publicfunctionrules(){return[[[na ... [详细]
  • LeetCode 690:计算员工的重要性评分
    在解决LeetCode第690题时,我记录了详细的解题思路和方法。该问题要求根据员工的ID计算其重要性评分,包括直接和间接下属的重要性。本文将深入探讨如何使用哈希表(Map)来高效地实现这一目标。 ... [详细]
  • 斯特林数与幂
    参考资料:https:www.luogu.com.cnblogchtholly-willemsolution-p5408https:blog.csdn.netguizhiyuart ... [详细]
  • 深入解析RDMA中的队列对(Queue Pair)
    本文将详细探讨RDMA架构中的关键组件——队列对(Queue Pair,简称QP),包括其基本概念、硬件与软件实现、QPC的作用、QPN的分配机制以及用户接口和状态机。通过这些内容,读者可以更全面地理解QP在RDMA通信中的重要性和工作原理。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
author-avatar
糖糖糖开水
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有