热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Swoole实现Websocket每秒钟主动推送

Swoole实现Websocket主动推送需求:需要每秒钟向前台推送一次行情数据。向某个用户推送消息及时响应前端发出的请求握手时效验令牌每个ip不超过十个连接因为网上

Swoole实现Websocket主动推送

需求:
需要每秒钟向前台推送一次行情数据。
向某个用户推送消息
及时响应前端发出的请求
握手时效验令牌
每个ip 不超过十个连接
因为网上没有很具体的相关文档,只能摸着石头过河,现功能已经大致实现,如果有应用不合理的地方希望大神不吝赐教!

Swoole实现Websocket每秒钟主动推送
$config = require('/config/swoole.php');
class Websocket {
public $server;
private $config;
public function __construct($config) {
$this->config = $config;
$server = new Swoole\WebSocket\Server($this->config['web_host'],$this->config['web_port']);
$this->server = $server;
$this->server->set([
'worker_num' => 8,
'daemonize' => 1,
'backlog' => 128,
'max_request' => 10000,
'heartbeat_check_interval' => 60,
'task_worker_num' => 24,
]);
$process = new Swoole\Process(function($process) use ($server) {
while (true) {
$show_market_list = $this->getRealMarket();
if ($show_market_list) {
foreach ($this->server->connections as $conn) {
$this->server->push($conn,json_encode(['type' => 'market','date' => $show_market_list]));
}
}
sleep(1);
}
});
$newprocess = new Swoole\Process(function($process) use ($server){
while (true) {
$redis = $this->newRedis();
$new_list = $redis->hGetALL('swoole_new_push');
if ($new_list) {
//把消息推给所有用户
foreach ($new_list as $k => $v) {
//查找用户的fd 检查用户是否在线
$user_fd = $redis -> hGet('swoole_uid_fd',$k);
if ($user_fd) {
$this->server->push($user_fd,json_encode(['type' => 'new','date' => $v]));
}
}
}
sleep(1);
}
});
$this->server->addProcess($newprocess);
$this->server->addProcess($process);
$this->server->on('handshake',function($request,$response){
$secWebSocketKey = $request->header['sec-websocket-key'];
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {
$response->end();
$this->server->close($request->fd);
return 500;
}
$key = base64_encode(sha1(
$request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
true
));
$headers = [
'Upgrade' => 'websocket',
'Connection' => 'Upgrade',
'Sec-WebSocket-Accept' => $key,
'Sec-WebSocket-Version' => '13',
];
if (isset($request->header['sec-websocket-protocol'])) {
//占用ip
$real_ip = $request->header['x-real-ip'];
$result = $this->checkMac(md5($real_ip));
if (!$result) {
$response->end();
$this->server->close($request->fd);
return 500;
}
$headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
}else{
$response->end();
$this->server->close($request->fd);
return 500;
}
foreach ($headers as $key => $val) {
$response->header($key, $val);
}
$response->status(101);
$response->end();
$getAllMarket = $this->getAllMarket();
$this->server -> push($request->fd,json_encode($getAllMarket));
});
$this->server->on('open', function (swoole_websocket_server $server, $request) {

});$this->server->on('task',function(swoole_server $server, $task_id, $from_id, $data){$task = json_decode($data['type'],1);switch ($task['type']) {//K线case 'kline':$result['type'] = $task['type'];$result['kline_time_type'] = $task['kline_time_type'];$result['market'] = $task['market'];$result['data'] = $this->getKlineDate($task['kline_time_type'],$task['market']);$result['history'] = $this->getMarketRecord($task['market']);$server -> push($data['fd'],json_encode($result));break;//用户注册case 'login':$token = $task['token'];$redis = $this->newRedis();$uid = $redis->hGet('user_token',$token);if ($uid) {//找到了 用户的uid$redis -> hSet('swoole_uid_fd',$uid,$data['fd']);$redis -> hSet('swoole_fd_uid',$data['fd'],$uid);$result['type'] = 'login';$result['code'] = 200;}else{$result['type'] = 'login';$result['code'] = 400;}$server -> push($data['fd'],json_encode($result));break;//响应消息case 'respond':$token = $task['token'];$redis = $this->newRedis();$uid = $redis->hGet('user_token',$token);if ($uid) {$redis->hDel('swoole_new_push',$uid);$result['type'] = 'respond';$result['code'] = 200;}else{$result['type'] = 'respond';$result['code'] = 400;}$server -> push($data['fd'],json_encode($result));break;default:break;}});$this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {$task['type'] = $frame->data;$task['fd'] = $frame->fd;$server -> task($task);});$this->server->on('close', function ($server, $fd) {//删除用户在线$redis = $this->newRedis();$uid = $redis -> hGet('swoole_fd_uid',$fd);if ($uid) {$redis -> hDel('swoole_uid_fd',$uid);$redis -> hDel('swoole_fd_uid',$fd);}});$this->server->on('request', function ($request, $response) {});$this->server->start();
}
//K线数据
public function getKlineDate($type,$market){$redis = $this->newRedis();$date = $redis->hGet($type,$market);return $date;
}
public function getMarketRecord($market){$redis = $this->newRedis();$result['yest_close_price'] = $redis->hget('yest_close_price',$market);$result['today_open_price'] = $redis->hget('today_open_price',$market);$result['today_low_price'] = $redis->hget('today_low_price',$market);$result['today_high_price'] = $redis->hget('today_high_price',$market);return $result;
}
//获取全部行情
public function getAllMarket(){$redis = $this->newRedis();$market_buy_list = $redis -> HgetAll('real_market_buy');$market_sell_list = $redis -> HgetAll('real_market_sell');$market_real_list = $redis->HgetAll('real_market');foreach ($market_real_list as $k => $v) {$show_market_list[$k]['real'] = $v;$show_market_list[$k]['sell'] = $market_sell_list[$k];$show_market_list[$k]['buy'] = $market_buy_list[$k];}return $show_market_list;
}//获取变动的行情 进行广播
public function getRealMarket(){$redis = $this->newRedis();$market_buy_list = $redis -> HgetAll('real_market_buy');$market_sell_list = $redis -> HgetAll('real_market_sell');$market_real_list = $redis -> HgetAll('real_market');foreach ($market_real_list as $k => $v) {$result['real'] = $v;$result['sell'] = $market_sell_list[$k];$result['buy'] = $market_buy_list[$k];$history_market = $redis -> hGet('history_market',$k);if ($history_market == json_encode($result)) {//这一次读取的数据 和历史数据是一样的continue;}else{$show_market_list[$k] = $result;}}return $show_market_list;
}
//每个IP 每分钟最多10个连接
public function checkMac($mac,$status &#61; 0){$zero_time &#61; strtotime(date(&#39;Y-m-d H:i&#39;));$redis &#61; $this->newRedis();$res &#61; $redis->hGet(&#39;Conn_Mac&#39;,$mac);if ($res) {try {$res &#61; json_decode($res,1);if ($res[&#39;time&#39;] !&#61; $zero_time) {//如果已经超过了一分钟区间$json[&#39;time&#39;] &#61; $zero_time;$json[&#39;rate&#39;] &#61; 1;$redis -> Hset(&#39;Conn_Mac&#39;,$mac,json_encode($json));return true;}else{if ($res[&#39;rate&#39;] <100) {//如果连接数还没有达到上限$json &#61; json_encode([&#39;time&#39; &#61;> $zero_time,&#39;rate&#39; &#61;> $res[&#39;rate&#39;] &#43; 1]);$redis ->Hset(&#39;Conn_Mac&#39;,$mac,$json);return true;}else{//连接次数达到上线return false;}}} catch (Exception $e) {$json[&#39;time&#39;] &#61; $zero_time;$json[&#39;rate&#39;] &#61; 1;$redis ->Hset(&#39;Conn_Mac&#39;,$mac,json_encode($json));return true;}}else{$arr[&#39;time&#39;] &#61; $zero_time;$arr[&#39;rate&#39;] &#61; 1;$redis -> hSet(&#39;Conn_Mac&#39;,$mac,json_encode($arr));return true;}
}
public function newRedis(){$redis &#61; new redis();$redis -> connect($this->config[&#39;read_redis_host&#39;],$this->config[&#39;read_redis_prot&#39;]);$redis -> auth($this->config[&#39;read_redis_auth&#39;]);return $redis;
}

}
new Websocket($config);
?>
Swoole实现Websocket每秒钟主动推送
Swoole实现Websocket每秒钟主动推送

转:https://blog.51cto.com/14218196/2359306



推荐阅读
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了django中视图函数的使用方法,包括如何接收Web请求并返回Web响应,以及如何处理GET请求和POST请求。同时还介绍了urls.py和views.py文件的配置方式。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • express工程中的json调用方法
    本文介绍了在express工程中如何调用json数据,包括建立app.js文件、创建数据接口以及获取全部数据和typeid为1的数据的方法。 ... [详细]
  • Servlet多用户登录时HttpSession会话信息覆盖问题的解决方案
    本文讨论了在Servlet多用户登录时可能出现的HttpSession会话信息覆盖问题,并提供了解决方案。通过分析JSESSIONID的作用机制和编码方式,我们可以得出每个HttpSession对象都是通过客户端发送的唯一JSESSIONID来识别的,因此无需担心会话信息被覆盖的问题。需要注意的是,本文讨论的是多个客户端级别上的多用户登录,而非同一个浏览器级别上的多用户登录。 ... [详细]
  • 本文介绍了Java后台Jsonp处理方法及其应用场景。首先解释了Jsonp是一个非官方的协议,它允许在服务器端通过Script tags返回至客户端,并通过javascript callback的形式实现跨域访问。然后介绍了JSON系统开发方法,它是一种面向数据结构的分析和设计方法,以活动为中心,将一连串的活动顺序组合成一个完整的工作进程。接着给出了一个客户端示例代码,使用了jQuery的ajax方法请求一个Jsonp数据。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了在使用Python中的aiohttp模块模拟服务器时出现的连接失败问题,并提供了相应的解决方法。文章中详细说明了出错的代码以及相关的软件版本和环境信息,同时也提到了相关的警告信息和函数的替代方案。通过阅读本文,读者可以了解到如何解决Python连接服务器失败的问题,并对aiohttp模块有更深入的了解。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • ASP.NET2.0数据教程之十四:使用FormView的模板
    本文介绍了在ASP.NET 2.0中使用FormView控件来实现自定义的显示外观,与GridView和DetailsView不同,FormView使用模板来呈现,可以实现不规则的外观呈现。同时还介绍了TemplateField的用法和FormView与DetailsView的区别。 ... [详细]
  • node.jsrequire和ES6导入导出的区别原 ... [详细]
author-avatar
白羊幸福的佳佳
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有