热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Swoole实现Websocket每秒钟主动推送

Swoole实现Websocket主动推送需求:需要每秒钟向前台推送一次行情数据。向某个用户推送消息及时响应前端发出的请求握手时效验令牌每个ip不超过十个连接因为网上

Swoole实现Websocket主动推送

需求:
需要每秒钟向前台推送一次行情数据。
向某个用户推送消息
及时响应前端发出的请求
握手时效验令牌
每个ip 不超过十个连接
因为网上没有很具体的相关文档,只能摸着石头过河,现功能已经大致实现,如果有应用不合理的地方希望大神不吝赐教!

Swoole实现Websocket每秒钟主动推送
$config = require('/config/swoole.php');
class Websocket {
public $server;
private $config;
public function __construct($config) {
$this->config = $config;
$server = new Swoole\WebSocket\Server($this->config['web_host'],$this->config['web_port']);
$this->server = $server;
$this->server->set([
'worker_num' => 8,
'daemonize' => 1,
'backlog' => 128,
'max_request' => 10000,
'heartbeat_check_interval' => 60,
'task_worker_num' => 24,
]);
$process = new Swoole\Process(function($process) use ($server) {
while (true) {
$show_market_list = $this->getRealMarket();
if ($show_market_list) {
foreach ($this->server->connections as $conn) {
$this->server->push($conn,json_encode(['type' => 'market','date' => $show_market_list]));
}
}
sleep(1);
}
});
$newprocess = new Swoole\Process(function($process) use ($server){
while (true) {
$redis = $this->newRedis();
$new_list = $redis->hGetALL('swoole_new_push');
if ($new_list) {
//把消息推给所有用户
foreach ($new_list as $k => $v) {
//查找用户的fd 检查用户是否在线
$user_fd = $redis -> hGet('swoole_uid_fd',$k);
if ($user_fd) {
$this->server->push($user_fd,json_encode(['type' => 'new','date' => $v]));
}
}
}
sleep(1);
}
});
$this->server->addProcess($newprocess);
$this->server->addProcess($process);
$this->server->on('handshake',function($request,$response){
$secWebSocketKey = $request->header['sec-websocket-key'];
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {
$response->end();
$this->server->close($request->fd);
return 500;
}
$key = base64_encode(sha1(
$request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
true
));
$headers = [
'Upgrade' => 'websocket',
'Connection' => 'Upgrade',
'Sec-WebSocket-Accept' => $key,
'Sec-WebSocket-Version' => '13',
];
if (isset($request->header['sec-websocket-protocol'])) {
//占用ip
$real_ip = $request->header['x-real-ip'];
$result = $this->checkMac(md5($real_ip));
if (!$result) {
$response->end();
$this->server->close($request->fd);
return 500;
}
$headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
}else{
$response->end();
$this->server->close($request->fd);
return 500;
}
foreach ($headers as $key => $val) {
$response->header($key, $val);
}
$response->status(101);
$response->end();
$getAllMarket = $this->getAllMarket();
$this->server -> push($request->fd,json_encode($getAllMarket));
});
$this->server->on('open', function (swoole_websocket_server $server, $request) {

});$this->server->on('task',function(swoole_server $server, $task_id, $from_id, $data){$task = json_decode($data['type'],1);switch ($task['type']) {//K线case 'kline':$result['type'] = $task['type'];$result['kline_time_type'] = $task['kline_time_type'];$result['market'] = $task['market'];$result['data'] = $this->getKlineDate($task['kline_time_type'],$task['market']);$result['history'] = $this->getMarketRecord($task['market']);$server -> push($data['fd'],json_encode($result));break;//用户注册case 'login':$token = $task['token'];$redis = $this->newRedis();$uid = $redis->hGet('user_token',$token);if ($uid) {//找到了 用户的uid$redis -> hSet('swoole_uid_fd',$uid,$data['fd']);$redis -> hSet('swoole_fd_uid',$data['fd'],$uid);$result['type'] = 'login';$result['code'] = 200;}else{$result['type'] = 'login';$result['code'] = 400;}$server -> push($data['fd'],json_encode($result));break;//响应消息case 'respond':$token = $task['token'];$redis = $this->newRedis();$uid = $redis->hGet('user_token',$token);if ($uid) {$redis->hDel('swoole_new_push',$uid);$result['type'] = 'respond';$result['code'] = 200;}else{$result['type'] = 'respond';$result['code'] = 400;}$server -> push($data['fd'],json_encode($result));break;default:break;}});$this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {$task['type'] = $frame->data;$task['fd'] = $frame->fd;$server -> task($task);});$this->server->on('close', function ($server, $fd) {//删除用户在线$redis = $this->newRedis();$uid = $redis -> hGet('swoole_fd_uid',$fd);if ($uid) {$redis -> hDel('swoole_uid_fd',$uid);$redis -> hDel('swoole_fd_uid',$fd);}});$this->server->on('request', function ($request, $response) {});$this->server->start();
}
//K线数据
public function getKlineDate($type,$market){$redis = $this->newRedis();$date = $redis->hGet($type,$market);return $date;
}
public function getMarketRecord($market){$redis = $this->newRedis();$result['yest_close_price'] = $redis->hget('yest_close_price',$market);$result['today_open_price'] = $redis->hget('today_open_price',$market);$result['today_low_price'] = $redis->hget('today_low_price',$market);$result['today_high_price'] = $redis->hget('today_high_price',$market);return $result;
}
//获取全部行情
public function getAllMarket(){$redis = $this->newRedis();$market_buy_list = $redis -> HgetAll('real_market_buy');$market_sell_list = $redis -> HgetAll('real_market_sell');$market_real_list = $redis->HgetAll('real_market');foreach ($market_real_list as $k => $v) {$show_market_list[$k]['real'] = $v;$show_market_list[$k]['sell'] = $market_sell_list[$k];$show_market_list[$k]['buy'] = $market_buy_list[$k];}return $show_market_list;
}//获取变动的行情 进行广播
public function getRealMarket(){$redis = $this->newRedis();$market_buy_list = $redis -> HgetAll('real_market_buy');$market_sell_list = $redis -> HgetAll('real_market_sell');$market_real_list = $redis -> HgetAll('real_market');foreach ($market_real_list as $k => $v) {$result['real'] = $v;$result['sell'] = $market_sell_list[$k];$result['buy'] = $market_buy_list[$k];$history_market = $redis -> hGet('history_market',$k);if ($history_market == json_encode($result)) {//这一次读取的数据 和历史数据是一样的continue;}else{$show_market_list[$k] = $result;}}return $show_market_list;
}
//每个IP 每分钟最多10个连接
public function checkMac($mac,$status &#61; 0){$zero_time &#61; strtotime(date(&#39;Y-m-d H:i&#39;));$redis &#61; $this->newRedis();$res &#61; $redis->hGet(&#39;Conn_Mac&#39;,$mac);if ($res) {try {$res &#61; json_decode($res,1);if ($res[&#39;time&#39;] !&#61; $zero_time) {//如果已经超过了一分钟区间$json[&#39;time&#39;] &#61; $zero_time;$json[&#39;rate&#39;] &#61; 1;$redis -> Hset(&#39;Conn_Mac&#39;,$mac,json_encode($json));return true;}else{if ($res[&#39;rate&#39;] <100) {//如果连接数还没有达到上限$json &#61; json_encode([&#39;time&#39; &#61;> $zero_time,&#39;rate&#39; &#61;> $res[&#39;rate&#39;] &#43; 1]);$redis ->Hset(&#39;Conn_Mac&#39;,$mac,$json);return true;}else{//连接次数达到上线return false;}}} catch (Exception $e) {$json[&#39;time&#39;] &#61; $zero_time;$json[&#39;rate&#39;] &#61; 1;$redis ->Hset(&#39;Conn_Mac&#39;,$mac,json_encode($json));return true;}}else{$arr[&#39;time&#39;] &#61; $zero_time;$arr[&#39;rate&#39;] &#61; 1;$redis -> hSet(&#39;Conn_Mac&#39;,$mac,json_encode($arr));return true;}
}
public function newRedis(){$redis &#61; new redis();$redis -> connect($this->config[&#39;read_redis_host&#39;],$this->config[&#39;read_redis_prot&#39;]);$redis -> auth($this->config[&#39;read_redis_auth&#39;]);return $redis;
}

}
new Websocket($config);
?>
Swoole实现Websocket每秒钟主动推送
Swoole实现Websocket每秒钟主动推送

转:https://blog.51cto.com/14218196/2359306



推荐阅读
author-avatar
白羊幸福的佳佳
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有