基于 Swoole 在 Laravel 中实现异步任务队列
由 学院君 创建于1年前, 最后更新于 1年前
版本号 #1
14019 views
10 likes
2 collects
Swoole 异步任务
实现原理
我们知道,PHP 本身的设计是同步阻塞的,不支持多线程和异步 IO,所以当我们执行一些耗时的操作,比如发送广播,或者邮件,如果直接在当前进程中操作,会导致服务器响应变慢,因此要借助一些第三方服务来处理以实现异步功能,比如队列,而 Swoole 作为 PHP 异步网络通信引擎,自然也对异步任务处理提供了支持,其底层的实现原理和常见的异步队列类似:将耗时任务投递到 TaskWorker 进程池后返回(相应任务会通过 TaskWorker 异步执行,执行成功后可以调用事先注册的回调函数进行后续处理),继续后续业务逻辑的执行,而不影响当前请求的处理速度。关于 TaskWorker 后面我们介绍 Swoole 底层原理的时候还会详细介绍。
示例代码
我们可以基于 Swoole 入门教程中编写的 TCP 服务器为基础来实现一个异步任务服务器用来处理异步任务,只需添加一个任务处理和一个任务完成回调即可,此外,还需要配置 TaskWorker 进程数以保证任务处理的速度,可以根据任务的耗时和任务量配置其值,通常我们可以将其配置为 CPU 数量的两倍:
$server = new \Swoole\Server("127.0.0.1", 9503);
// 设置异步任务的工作进程数量
$server->set(array('task_worker_num' => 4));
//收到请求时触发
$server->on('receive', function(\Swoole\Server $server, $fd, $from_id, $data) {
//投递异步任务
$task_id = $server->task($data);
echo "异步任务投递成功: id=$task_id\n";
$server->send($fd, "数据已接收,处理中...");
});
// 处理异步任务
$server->on('task', function (\Swoole\Server $server, $task_id, $from_id, $data) {
echo "新的待处理异步任务[id=$task_id]".PHP_EOL;
// todo 处理异步任务
// 返回任务执行的结果
$server->finish("$data -> OK");
});
// 处理异步任务的结果
$server->on('finish', function (\Swoole\Server $server, $task_id, $data) {
echo "异步任务[$task_id] 处理完成: $data".PHP_EOL;
});
$server->start();
我们将该文件保存为 async_task_server.php,然后在命令行启动这个服务器:
php async_task_server.php
php tcp_client.php
服务器投递任务到 TaskWorker 后会立即将处理信息发送给客户端:
然后服务器会异步处理任务:
在 Laravel 中基于 Swoole 实现异步任务队列
还是以 LaravelS 扩展包为基础,我们在 Laravel 项目中实现基于 Swoole 实现异步任务队列功能,其原理就是上面介绍的 Swoole 异步任务,只不过该扩展包对其做了封装而已。
编写任务类
首先我们创建一个继承自 Hhxsv5\LaravelS\Swoole\Task\Task 基类的待处理任务类 TestTask:
namespace App\Jobs;
use Hhxsv5\LaravelS\Swoole\Task\Task;
use Illuminate\Support\Facades\Log;
class TestTask extends Task
{
// 待处理任务数据
private $data;
// 任务处理结果
private $result;
public function __construct($data)
{
$this->data = $data;
}
// 任务投递调用 task 回调时触发,等同于 Swoole 中的 onTask 逻辑
public function handle()
{
Log::info(__CLASS__ . ': 开始处理任务', [$this->data]);
// todo 耗时任务具体处理逻辑在这里编写
sleep(3); // 模拟任务需要3秒才能执行完毕
$this->result = 'The result of ' . $this->data . ' is balabalabala';
}
// 任务完成调用 finish 回调时触发,等同于 Swoole 中的 onFinish 逻辑
public function finish()
{
Log::info(__CLASS__ . ': 任务处理完成', [$this->result]);
// 可以在这里触发后续要执行的任务,或者执行其他善后逻辑
}
}
编写测试代码
然后在 routes/web.php 编写投递异步任务的测试代码如下:
Route::get('/task/test', function () {
$task = new \App\Jobs\TestTask('测试异步任务');
$success = \Hhxsv5\LaravelS\Swoole\Task\Task::deliver($task); // 异步投递任务,触发调用任务类的 handle 方法
var_dump($success);
});
修改配置文件
此外还要在配置文件 config/laravels.php 中取消 task_worker_num 配置项前面的注释:
'swoole' => [
...
'task_worker_num' => function_exists('swoole_cpu_num') ? swoole_cpu_num() * 2 : 8,
...
]
测试异步任务执行
接下来,我们重启启动 Swoole 服务器(基于 Swoole HTTP 服务器访问路由才能成功投递异步任务):
php bin/laravels restart
然后在浏览器中通过 http://todo-s.test/task/test 访问测试路由,页面立即显示投递成功:
bool(true)
然后我们去 storage/logs 目录下查看最新的日志信息,可以看到任务执行其实耗费了 3 秒:
[2019-05-30 16:31:32] local.INFO: App\Jobs\TestTask: 开始处理任务 ["测试异步任务"]
[2019-05-30 16:31:35] local.INFO: App\Jobs\TestTask: 任务处理完成 ["The result of 测试异步任务 is balabalabala"]
这样,我们就成功在 Laravel 项目中基于 Swoole 实现了异步任务消费队列,很简单吧。