热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PHP并发编程之MasterWorker模式

Master-Worker的模式结构Master进程为主进程,它维护了一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程,不停地从任务队列中提






Master-Worker 的模式结构

Master 进程为主进程,它维护了一个 Worker 进程队列、子任务队列和子结果集。Worker 进程队列中的 Worker 进程,不停地从任务队列中提取要处理的子任务,并将子任务的处理结果写入结果集。


  • 使用多进程

  • 支持 Worker 错误重试,仅仅实现业务即可

  • 任务累积过多,自动 Fork Worker 进程

  • 常驻 Worker 进程,减少进程 Fork 开销

  • 非常驻 Worker 进程闲置,自动退出回收

  • 支持日志

Demo: 基于 Redis 生产消费队列 在 test 目录中

代码:

declare(ticks = 1);
// 必须先使用语句declare(ticks=1),否则注册的singal-handel就不会执行了
//error_reporting(E_ERROR);
abstract class MasterWorker
{
// 子进程配置属性
protected $maxWorkerNum; // 最多只能开启进程数
protected $minWorkerNum; // 最少常驻子进程数
protected $waitTaskTime; // 等待任务时间,单位秒
protected $waitTaskLoopTimes; // 连续这么多次队列为空就退出子进程
protected $consumeTryTimes; // 连续消费失败次数
// 父进程专用属性
protected $worker_list = [];
protected $check_internal = 1;
protected $masterExitCallback = [];
// 子进程专用属性
protected $autoQuit = false;
protected $status = self::WORKER_STATUS_IDLE;
protected $taskData; // 任务数据
protected $workerExitCallback = [];
// 通用属性
protected $stop_service = false;
protected $master = true;
// 通用配置
protected $logFile;
const WORKER_STATUS_IDLE = 'idle';
const WORKER_STATUS_FINISHED = 'finished';
const WORKER_STATUS_EXITING = 'exiting';
const WORKER_STATUS_WORKING = 'working';
const WORKER_STATUS_FAIL = 'fail';
const WORKER_STATUS_TERMINATED = 'terminated';
public function __construct($optiOns= [])
{
$this->initConfig($options);
}
protected function initConfig($optiOns= [])
{
$defaultCOnfig= [
'maxWorkerNum' => 10,
'minWorkerNum' => 3,
'waitTaskTime' => 0.01,
'waitTaskLoopTimes' => 50,
'consumeTryTimes' => 3,
'logFile' => './master_worker.log',
];
foreach ($defaultConfig as $key => $default) {
$this->$key = array_key_exists($key, $options) ? $options[$key] : $default;
}
}
public function start()
{
// 父进程异常,需要终止子进程
set_exception_handler([$this, 'exceptionHandler']);
// fork minWorkerNum 个子进程
$this->mutiForkWorker($this->minWorkerNum);
if ($this->getWorkerLength() <= 0) {
$this->masterWaitExit(true, 'fork 子进程全部失败');
}
// 父进程监听信号
pcntl_signal(SIGTERM, [$this, 'sig_handler']);
pcntl_signal(SIGINT, [$this, 'sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'sig_handler']);
pcntl_signal(SIGCHLD, [$this, 'sig_handler']);
// 监听队列,队列比进程数多很多,则扩大进程,扩大部分的进程会空闲自动退出
$this->checkWorkerLength();
$this->masterWaitExit();
}
/**
* Master 等待退出
*
* @param boolean $force 强制退出
* @param string $msg 退出 message
* @return void
*/
protected function masterWaitExit($force = false, $msg = '')
{
// 强制发送退出信号
$force && $this->sig_handler(SIGTERM);
// 等到子进程退出
while ($this->stop_service) {
$this->checkExit($msg);
$this->msleep($this->check_internal);
}
}
protected function log($msg)
{
try {
$header = $this->isMaster() ? 'Master [permanent]' : sprintf('Worker [%s]', $this->autoQuit ? 'temporary' : 'permanent');
$this->writeLog($msg, $this->getLogFile(), $header);
} catch (\Exception $e) {
}
}
protected function mutiForkWorker($num, $autoQuit = false, $maxTryTimes = 3)
{
for ($i = 1; $i <= $num; ++$i) {
$this->forkWorker($autoQuit, $maxTryTimes);
}
}
protected function checkWorkerLength()
{
// 如果要退出父进程,就不执行检测
while (! $this->stop_service) {
$this->msleep($this->check_internal);
// 处理进程
$workerLength = $this->getWorkerLength();
// 如果进程数小于最低进程数
$this->mutiForkWorker($this->minWorkerNum - $workerLength);
$workerLength = $this->getWorkerLength();
// 创建常驻worker进程失败, 下次检查继续尝试创建
if ($workerLength <= 0) {
continue;
}
if ($workerLength >= $this->maxWorkerNum) {
// 不需要增加进程
continue;
}
$num = $this->calculateAddWorkerNum();
// 不允许超过最大进程数
$num = min($num, $this->maxWorkerNum - $workerLength);
// 创建空闲自动退出worker进程
$this->mutiForkWorker($num, true);
}
}
protected function getWorkerLength()
{
return count($this->worker_list);
}
//信号处理函数
public function sig_handler($sig)
{
switch ($sig) {
case SIGTERM:
case SIGINT:
case SIGQUIT:
// 退出: 给子进程发送退出信号,退出完成后自己退出
// 先标记一下,子进程完全退出后才能结束
$this->stop_service = true;
// 给子进程发送信号
foreach ($this->worker_list as $pid => $v) {
posix_kill($pid, SIGTERM);
}
break;
case SIGCHLD:
// 子进程退出, 回收子进程, 并且判断程序是否需要退出
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
// 去除子进程
unset($this->worker_list[$pid]);
// 子进程是否正常退出
// if (pcntl_wifexited($status)) {
// //
// }
}
$this->checkExit();
break;
default:
$this->default_sig_handler($sig);
break;
}
}
public function child_sig_handler($sig)
{
switch ($sig) {
case SIGINT:
case SIGQUIT:
case SIGTERM:
$this->stop_service = true;
break;
// 操作比较危险 在处理任务当初强制终止
// case SIGTERM:
// // 强制退出
// $this->stop_service = true;
// $this->status = self::WORKER_STATUS_TERMINATED;
// $this->beforeWorkerExitHandler();
// $this->status = self::WORKER_STATUS_EXITING;
// die(1);
// break;
}
}
protected function checkExit($msg = '')
{
if ($this->stop_service && empty($this->worker_list)) {
$this->beforeMasterExitHandler();
die($msg ?:'Master 进程结束, Worker 进程全部退出');
}
}
protected function forkWorker($autoQuit = false, $maxTryTimes = 3)
{
$times = 1;
do {
$pid = pcntl_fork();
if ($pid == -1) {
++$times;
} elseif($pid) {
$this->worker_list[$pid] = true;
//echo 'pid:', $pid, "\n";
return $pid;
} else {
// 子进程 消费
$this->autoQuit = $autoQuit;
$this->master = false;
// 处理信号
pcntl_signal(SIGTERM, [$this, 'child_sig_handler']);
pcntl_signal(SIGINT, [$this, 'child_sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'child_sig_handler']);
exit($this->runChild()); // worker进程结束
}
} while ($times <= $maxTryTimes);
// fork 3次都失败
return false;
}
/**
* 子进程处理内容
*/
protected function runChild()
{
$noDataLoopTime = 0;
$status = 0;
while (!$this->autoQuit || ($noDataLoopTime <= $this->waitTaskLoopTimes)) {
// 处理退出
if ($this->stop_service) {
break;
}
$this->taskData = null;
try {
$this->taskData = $this->deQueue();
if ($this->taskData) {
$noDataLoopTime = 1; // 重新从1开始
$this->status = self::WORKER_STATUS_WORKING;
$this->consumeByRetry($this->taskData);
$this->status = self::WORKER_STATUS_FINISHED;
} else {
$this->status = self::WORKER_STATUS_IDLE;
// 避免溢出
$noDataLoopTime = $noDataLoopTime >= PHP_INT_MAX ? PHP_INT_MAX : ($noDataLoopTime + 1);
// 等待队列
$this->msleep($this->waitTaskTime);
}
$status = 0;
} catch (\RedisException $e) {
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 1;
} catch (\Exception $e) {
// 消费出现错误
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 2;
}
}
$this->beforeWorkerExitHandler();
$this->status = self::WORKER_STATUS_EXITING;
return $status;
}
/**
* @param $data
* @param int $tryTimes
* @throws \Exception
*/
protected function consumeByRetry($data, $tryTimes = 1)
{
$tryTimes = 1;
$exception = null;
// consume 返回false 为失败
while ($tryTimes <= $this->consumeTryTimes) {
try {
return $this->consume($data);
} catch (\Exception $e) {
$exception = $e;
++$tryTimes;
}
}
// 最后一次还报错 写日志
if (($tryTimes > $this->consumeTryTimes) && $exception) {
throw $exception;
}
}
/**
* @param $mixed
* @param $filename
* @param $header
* @param bool $trace
* @return bool
* @throws \Exception
*/
protected function writeLog($mixed, $filename, $header, $trace = false)
{
if (is_string($mixed)) {
$text = $mixed;
} else {
$text = var_export($mixed, true);
}
$trace_list = "";
if ($trace) {
$_t = debug_backtrace();
$trace_list = "-- TRACE : \r\n";
foreach ($_t as $_line) {
$trace_list .= "-- " . $_line ['file'] . "[" . $_line ['line'] . "] : " . $_line ['function'] . "()" . "\r\n";
}
}
$text = "\r\n=" . $header . "==== " . strftime("[%Y-%m-%d %H:%M:%S] ") . " ===\r\n<" . getmypid() . "> : " . $text . "\r\n" . $trace_list;
$h = fopen($filename, 'a');
if (! $h) {
throw new \Exception('Could not open logfile:' . $filename);
}
// exclusive lock, will get released when the file is closed
if (! flock($h, LOCK_EX)) {
return false;
}
if (fwrite($h, $text) === false) {
throw new \Exception('Could not write to logfile:' . $filename);
}
flock($h, LOCK_UN);
fclose($h);
return true;
}
protected function msleep($time)
{
usleep($time * 1000000);
}
public function exceptionHandler($exception)
{
if ($this->isMaster()) {
$msg = '父进程['.posix_getpid().']错误退出中:' . $exception->getMessage();
$this->log($msg);
$this->masterWaitExit(true, $msg);
} else {
$this->child_sig_handler(SIGTERM);
}
}
public function isMaster()
{
return $this->master;
}
/**
* 默认的 worker 数量增加处理
*
* @return int
*/
public function calculateAddWorkerNum()
{
$workerLength = $this->getWorkerLength();
$taskLength = $this->getTaskLength();
// 还不够多
if (($taskLength / $workerLength <3) && ($taskLength - $workerLength <10)) {
return 0;
}
// 增加一定数量的进程
return ceil($this->maxWorkerNum - $workerLength / 2);
}
/**
* 自定义日子文件
*
* @return string
*/
protected function getLogFile()
{
return $this->logFile;
}
/**
* 自定义消费错误函数
*
* @param [type] $data
* @param \Exception $e
* @return void
*/
protected function consumeFail($data, \Exception $e)
{
$this->log(['data' => $data, 'errorCode' => $e->getCode(), 'errorMsg' => get_class($e) . ' : ' . $e->getMessage()]);
}
protected function beforeWorkerExitHandler()
{
foreach ($this->workerExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
/**
* 设置Worker自定义结束回调
*
* @param mixed $func
* @param boolean $prepend
* @return void
*/
public function setWorkerExitCallback($callback, $prepend = false)
{
return $this->setCallbackQueue('workerExitCallback', $callback, $prepend);
}
/**
* 设置Master自定义结束回调
*
* @param callable $func
* @param boolean $prepend
* @return void
*/
public function setMasterExitCallback(callable $callback, $prepend = false)
{
return $this->setCallbackQueue('masterExitCallback', $callback, $prepend);
}
protected function setCallbackQueue($queueName, $callback, $prepend = false)
{
if (! isset($this->$queueName) || ! is_array($this->$queueName)) {
return false;
}
if (is_null($callback)) {
$this->$queueName = []; // 如果传递 null 就清空
return true;
} elseif (! is_callable($callback)) {
return false;
}
if ($prepend) {
array_unshift($this->$queueName, $callback);
} else {
$this->$queueName[] = $callback;
}
return true;
}
protected function beforeMasterExitHandler()
{
foreach ($this->masterExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
protected function default_sig_handler($sig)
{
}
/**
* 得到待处理任务数量
*/
abstract protected function getTaskLength();
/**
* 出队
* @return mixed
*/
abstract public function deQueue();
/**
* 入队
* @param $data
* @return int
*/
abstract public function enQueue($data);
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
abstract protected function consume($data);
}

Demo

基于redis 的 生产者-消费者模式


RedisProducterConsumer.php


require "../src/MasterWorker.php";
class RedisProducterConsumer extends MasterWorker
{
const QUERY_NAME = 'query_name';
/**
* Master 和 Worker 的连接分开,否则会出现问题
*
* @var Redis[]
*/
protected $redis_cOnnections= [];
public function __construct($optiOns= [])
{
parent::__construct($options);
// 设置退出回调
$this->setWorkerExitCallback(function ($worker) {
$this->closeRedis();
// 处理结束,把redis关闭
$this->log('进程退出:' . posix_getpid());
});
$this->setMasterExitCallback(function ($master) {
$this->closeRedis();
$this->log('master 进程退出:' . posix_getpid());
});
}
/**
* 得到队列长度
*/
protected function getTaskLength()
{
return (int) $this->getRedis()->lSize(static::QUERY_NAME);
}
/**
* 出队
* @return mixed
*/
public function deQueue()
{
return $this->getRedis()->lPop(static::QUERY_NAME);
}
/**
* 入队
* @param $data
* @return int
*/
public function enQueue($data)
{
return $this->getRedis()->rPush(static::QUERY_NAME, (string) $data);
}
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
protected function consume($data)
{
// 错误抛出异常
//throw new Exception('错误信息');
$this->log('消费中 ' . $data);
$this->msleep(1);
$this->log('消费结束:' . $data . '; 剩余个数:' . $this->getTaskLength());
}
/**
* @return Redis
*/
public function getRedis()
{
$index = $this->isMaster() ? 'master' : 'worker';
// 后续使用 predis 使用redis池
if (! isset($this->redis_connections[$index])) {
$cOnnection= new \Redis();
$connection->connect('127.0.0.1', 6379, 2);
$this->redis_connections[$index] = $connection;
}
return $this->redis_connections[$index];
}
public function closeRedis()
{
foreach ($this->redis_connections as $key => $connection) {
$connection && $connection->close();
}
}
protected function consumeFail($data, \Exception $e)
{
parent::consumeFail($data, $e);
// 自定义操作,比如重新入队,上报错误等
}
}

调用例子

require "./RedisProducterConsumer.php";
$producterCOnsumer= new RedisProducterConsumer();
// 清空任务队列
$producterConsumer->getRedis()->ltrim(RedisProducterConsumer::QUERY_NAME, 1, 0);
// 写入任务队列
for ($i = 1; $i <= 100; ++$i) {
$producterConsumer->enQueue($i);
}
$producterConsumer->start();
// 接下来的写的代码不会执行
// 查看运行的进程
// ps aux | grep test.php
// 试一试 Ctrl + C 在执行上面产看进程命令

代码地址:https://github.com/MrSuperLi/php-master-wo...




php
并发编程
master-worker


推荐阅读
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 本文详细介绍了 Apache Jena 库中的 Txn.executeWrite 方法,通过多个实际代码示例展示了其在不同场景下的应用,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 微软Exchange服务器遭遇2022年版“千年虫”漏洞
    微软Exchange服务器在新年伊始遭遇了一个类似于‘千年虫’的日期处理漏洞,导致邮件传输受阻。该问题主要影响配置了FIP-FS恶意软件引擎的Exchange 2016和2019版本。 ... [详细]
  • 深入解析for与foreach遍历集合时的性能差异
    本文将详细探讨for循环和foreach(迭代器)在遍历集合时的性能差异,并通过实际代码示例和源码分析,帮助读者理解这两种遍历方式的不同之处。文章内容丰富且专业,旨在为编程爱好者提供有价值的参考。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 深入理解Redis的数据结构与对象系统
    本文详细探讨了Redis中的数据结构和对象系统的实现,包括字符串、列表、集合、哈希表和有序集合等五种核心对象类型,以及它们所使用的底层数据结构。通过分析源码和相关文献,帮助读者更好地理解Redis的设计原理。 ... [详细]
  • 本文探讨了哪些数据库支持队列式的写入操作(即一个键对应一个队列,数据可以连续入队),并且具备良好的持久化特性。这类需求通常出现在需要高效处理和存储大量有序数据的场景中。 ... [详细]
  • 本文介绍如何在Spring Boot项目中集成Redis,并通过具体案例展示其配置和使用方法。包括添加依赖、配置连接信息、自定义序列化方式以及实现仓储接口。 ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
author-avatar
糖糖糖开水
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有