set(array(
'worker_num' => 50, //worker进程数量
'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
));
function my_onReceive($serv, $fd, $from_id, $data)
{
echo "收到数据".$data.PHP_EOL;
//taskwait就是投递一条任务,这里直接传递SQL语句了
//然后阻塞等待SQL完成,并返回结果
$result = $serv->taskwait($data);
echo "任务结束".PHP_EOL;
if ($result !== false) {
list($status, $db_res) = explode(':', $result, 2);
if ($status == 'OK') {
//数据库操作成功了,执行业务逻辑代码,这里就自动释放掉MySQL连接的占用
//将处理结果发送给客户端
$serv->send($fd, var_export(unserialize($db_res), true) . "\n");
} else {
$serv->send($fd, $db_res);
}
return;
} else {
$serv->send($fd, "Error. Task timeout\n");//如果返回的是false,则说明taskwait等待超时,可以设置相应的等待超时时间
}
}
function my_onTask($serv, $task_id, $from_id, $sql)
{
echo "开始做任务 task id:".$task_id.PHP_EOL;
static $link = null;
HELL:
if ($link == null) {
$link = @mysqli_connect("127.0.0.1", "root", "passwd", "database");
if (!$link) {
$link = null;
$serv->finish("ER:" . mysqli_error($link));
return;
}
}
$result = $link->query($sql);
if (!$result) { //如果查询失败了
if(in_array(mysqli_errno($link), [2013, 2006])){//错误码为2013,或者2006,则重连数据库,重新执行sql
$link = null;
goto HELL;
}else{
$serv->finish("ER:" . mysqli_error($link));
return;
}
}
if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
$data = $result->fetch_assoc();
}else{//否则直接返回结果
$data = $result;
}
$serv->finish("OK:" . serialize($data));//调用finish方法,用于在task进程中通知worker进程,投递的任务已完成
//return "OK:".serialize($data);
}
function my_onFinish($serv, $task_id, $data)
{
echo "任务完成";//taskwait 没有触发这个函数。。
echo "AsyncTask Finish:Connect.PID=" . posix_getpid() . PHP_EOL;
}
$serv->on('receive', 'my_onReceive');
$serv->on('task', 'my_onTask');
$serv->on('Finish', 'my_onFinish');
$serv->start();//启动server