热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

php中Redis的应用消息传递的示例代码分享(图)

消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。本文主要介绍了php中Redis的应用--消息传递。下面跟着小编一起来看下吧阅读目录1、摘要2、实现方法3、一

消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。本文主要介绍了php中Redis的应用–消息传递。下面跟着小编一起来看下吧


阅读目录


1、摘要


2、实现方法


3、一对一消息传递


4、多对多消息传递


1、摘要


消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。常见的消息传递应用有,新浪微博中的@我呀、给你评论然后的提示呀、赞赞赞提示、私信呀、甚至是发微博分享的新鲜事;知乎中的私信呀、live发送过来的消息、知乎团队消息呀等等。


2、实现方法


消息传递即两个或者多个客户端在相互发送和接收消息。


通常有两种方法实现:


第一种为消息推送。Redis内置有这种机制,publish往频道推送消息、subscribe订阅频道。这种方法有一个缺点就是必须保证接收者时刻在线(即是此时程序不能停下来,一直保持监控状态,假若断线后就会出现客户端丢失信息)


第二种为消息拉取。所谓消息拉取,就是客户端自主去获取存储在服务器中的数据。Redis内部没有实现消息拉取这种机制。因此我们需要自己手动编写代码去实现这个功能。


在这里我们,我们进一步将消息传递再细分为一对一的消息传递,多对多的消息传递(群组消息传递)。


【注:两个类的代码相对较多,因此将其折叠起来了】


3、一对一消息传递


例子1:一对一消息发送与获取


模块要求:


1、提示有多少个联系人发来新消息


2、信息包含发送人、时间、信息内容


3、能够获取之前的旧消息


4、并且消息能够保持7天,过期将会被动触发删除


Redis实现思路:


1、新消息与旧消息分别采用两个链表来存储


2、原始消息的结构采用数组的形式存放,并且含有发送人、时间戳、信息内容


3、在推入redis的链表前,需要将数据转换为json类型然后再进行存储


4、在取出新信息时应该使用rpoplpush来实现,将已读的新消息推入旧消息链表中


5、取出旧消息时,应该用旧消息的时间与现在的时间进行对比,若超时,则直接删除后面的全部数据(因为数据是按时间一个一个压进链表中的,所以对于时间是有序排列的)


数据存储结构图:


PHP的实现代码:


#SinglePullMessage.class.php


redis=new Redis(); $this->redis->connect($host,$port); }  /** * @desc 发送消息(一个人) *  * @param $toUser string | 接收人 * @param $messageArr array | 发送的消息数组,包含sender、message、time  * * @return bool */ public function sendSingle($toUser,$messageArr) { $json_message=json_encode($messageArr); #编码成json数据 return $this->redis->lpush($toUser,$json_message); #将数据推入链表  } /** * @desc 用户获取新消息 * * @param $user string | 用户名 * * @return array 返回数组,包含多少个用户发来新消息,以及具体消息 */ public function getNewMessage($user) { #接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除 $messageArr=array(); while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user)) {  $temp=json_decode($json_message); #将json数据变成对象  $messageArr[$temp->sender][]=$temp; #转换成数组信息 } if($messageArr) {  $arr['count']=count($messageArr); #统计有多少个用户发来信息  $arr['messageArr']=$messageArr;  return $arr; } return false; } public function getPreMessage($user) { ##取出旧消息 $messageArr=array(); $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性将全部旧消息取出来 foreach ($json_pre as $k => $v)  {  $temp=json_decode($v);  #json反编码  $timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期  if($timeoutredis->del('preMessage_'.$user);   break;  }  $this->redis->ltrim('preMessage_'.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除  break;  }  $messageArr[$temp->sender][]=$temp; } return $messageArr; } /** * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。  * * @param $arr array | 需要处理的信息数组 * * @return 返回打印输出 */ public function dealArr($arr) { foreach ($arr as $k => $v)  {  foreach ($v as $k1 => $v2)   {  echo '发送人:'.$v2->sender.' 发送时间:'.date('Y-m-d h:i:s',$v2->time).'
'; echo '消息内容:'.$v2->message.'
'; } echo "
"; } }}

测试:


1、发送消息


#建立test1.php


include './SinglePullMessage.class.php';$object=new SinglePullMessage('192.168.95.11');#发送消息$sender='boss'; #发送者$to='jane';  #接收者$message='How are you'; #信息$time=time();$arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);echo $object->sendSingle($to,$arr);

2、获取新消息


#建立test2.php


include './SinglePullMessage.class.php';$object=new SinglePullMessage('192.168.95.11');#获取新消息$arr=$object->getNewMessage('jane');if($arr){ echo $arr['count']."个联系人发来新消息

"; $object->dealArr($arr['messageArr']); }else echo "无新消息";

访问结果:


3、获取旧消息


#建立test3.php


include './SinglePullMessage.class.php';$object=new SinglePullMessage('192.168.95.11');#获取旧消息$arr=$object->getPreMessage('jane');if($arr){ $object->dealArr($arr);}else echo "无旧数据";

4、多对多消息传递


例子2:多对多消息发送与获取(即是群组)


模块要求:


1、用户能够自行创建群组,并成为群主


2、群主可以拉人进来作为群组成员、并且可以踢人


3、用户可以直接退出群组


4、可以发送消息,每一位成员都可以拉取消息


5、群组的消息最大容纳量为5000条


6、成员可以拉取新消息,并提示有多少新消息


7、成员可以分页获取之前已读的旧消息


。。。。。功能就写这几个吧,有需要或者想练习的同学们可以增加其他功能,例如禁言、匿名消息发送、文件发送等等。


Redis实现思路:


1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。


2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值)


3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID


4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID


5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作【redis详细锁的讲解可以参考:Redis构建分布式锁http://www.gaodaima.com/article/109704.htm】


6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。


数据存储结构图:


PHP的代码实现:


#ManyPullMessage.class.php


redis=new Redis(); $this->redis->connect($host,$port); }  /** * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组 *  * @param $user string | 用户名,创建群组的主人 * @param $addUser array | 其他用户构成的数组 * * @param $lockName string | 锁的名字,用于获取群组ID的时候用 * @return int 返回群组ID */ public function createGroupChat($user, $addUser=array(), $lockName=&#039;chatIdLock&#039;) { $identifier=$this->getLock($lockName); #获取锁 if($identifier) {  $id=$this->redis->incr(&#039;groupChatID&#039;); #获取群组ID  $this->releaseLock($lockName,$identifier); #释放锁 } else  return false; $messageCount=$this->redis->set(&#039;countMessage_&#039;.$id, 0); #初始化这个群组消息计数器 #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接 $pipe=$this->redis->pipeline();  $this->redis->zadd(&#039;groupChat_&#039;.$id, 1, $user); #创建群组成员有序集合,并添加群主 #将这个群组添加到user所参加的群组有序集合中 $this->redis->zadd(&#039;hasGroupChat_&#039;.$user, 0, $id);  foreach ($addUser as $v) #创建群组的同时需要添加的用户成员 {  $this->redis->zadd(&#039;groupChat_&#039;.$id, 2, $v);  $this->redis->zadd(&#039;hasGroupChat_&#039;.$v, 0, $id); } $pipe->exec(); return $id; #返回群组ID } /** * @desc 群主主动拉人进群 * * @param $user string | 群主名 * @param $groupChatID int | 群组ID * @param $addMembers array | 需要拉进群的用户 * * @return bool */ public function addMembers($user, $groupChatID, $addMembers=array()) { $groupMasterScore=$this->redis->zscore(&#039;groupChat_&#039;.$groupChatID, $user); #将groupChatName的群主取出来 if($groupMasterScore==1) #判断user是否是群主 {  $pipe=$this->redis->pipeline(); #开启非事务流水线  foreach ($addMembers as $v)   {  $this->redis->zadd(&#039;groupChat_&#039;.$groupChatID, 2, $v);   #添加进群  $this->redis-、本文来源gao($daima.com搞@代@#码$网搞gaodaima代码>zadd(&#039;hasGroupChat_&#039;.$v, 0, $groupChatID); #添加群名到用户的有序集合中  }  $pipe->exec();  return true; } return false; } /** * @desc 群主删除成员 * * @param $user string | 群主名 * @param $groupChatID int | 群组ID * @param $delMembers array | 需要删除的成员名字 * * @return bool */ public function delMembers($user, $groupChatID, $delMembers=array()) { $groupMasterScore=$this->redis->zscore(&#039;groupChat_&#039;.$groupChatID, $user);  if($groupMasterScore==1) #判断user是否是群主 {  $pipe=$this->redis->pipeline(); #开启非事务流水线  foreach ($delMembers as $v)   {  $this->redis->zrem(&#039;groupChat_&#039;.$groupChatID, $v);     $this->redis->zrem(&#039;hasGroupChat_&#039;.$v, $groupChatID);   }  $pipe->exec();  return true; } return false; } /** * @desc 退出群组 * * @param $user string | 用户名 * @param $groupChatID int | 群组名 */ public function quitGroupChat($user, $groupChatID) { $this->redis->zrem(&#039;groupChat_&#039;.$groupChatID, $user); $this->redis->zrem(&#039;hasGroupChat_&#039;.$user, $groupChatID); return true; } /** * @desc 发送消息 * * @param $user string | 用户名 * @param $groupChatID int | 群组ID * @param $messageArr array | 包含发送消息的数组 * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID * * @return bool */ public function sendMessage($user, $groupChatID, $messageArr, $preLockName=&#039;countLock_&#039;) { $memberScore=$this->redis->zscore(&#039;groupChat_&#039;.$groupChatID, $user); #成员score if($memberScore) {  $identifier=$this->getLock($preLockName.$groupChatID); #获取锁  if($identifier) #判断获取锁是否成功  {  $messageCount=$this->redis->incr(&#039;countMessage_&#039;.$groupChatID);  $this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁  }  else  return false;  $json_message=json_encode($messageArr);  $this->redis->zadd(&#039;groupChatMessage_&#039;.$groupChatID, $messageCount, $json_message);  $count=$this->redis->zcard(&#039;groupChatMessage_&#039;.$groupChatID); #查看信息量大小  if($count>5000) #判断数据量有没有达到5000条  { #数据量超5000,则需要清除旧数据  $start=5000-$count;  $this->redis->zremrangebyrank(&#039;groupChatMessage_&#039;.$groupChatID, $start, $count);  }  return true; } return false; } /** * @desc 获取新信息 * * @param $user string | 用户名 * * @return 成功则放回json数据数组,无新信息返回false */ public function getNewMessage($user) { $arrID=$this->redis->zrange(&#039;hasGroupChat_&#039;.$user, 0, -1, &#039;withscores&#039;); #获取用户拥有的群组ID $json_message=array(); #初始化 foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息 {  $messageCount=$this->redis->get(&#039;countMessage_&#039;.$k); #群组最大信息分值数  if($messageCount>$v) #判断用户是否存在未读新消息  {  $json_message[$k][&#039;message&#039;]=$this->redis->zrangebyscore(&#039;groupChatMessage_&#039;.$k, $v+1, $messageCount);  $json_message[$k][&#039;count&#039;]=count($json_message[$k][&#039;message&#039;]); #统计新消息数量  $this->redis->zadd(&#039;hasGroupChat_&#039;.$user, $messageCount, $k); #更新已获取消息  }  } if($json_message)  return $json_message; return false; } /** * @desc 分页获取群组信息 * * @param $user string | 用户名  * @param $groupChatID int | 群组ID * @param $page int | 第几页 * @param $size int | 每页多少条数据 * * @return 成功返回json数据,失败返回false */ public function getPartMessage($user, $groupChatID, $page=1, $size=10) { $start=$page*$size-$size; #开始截取数据位置 $stop=$page*$size-1; #结束截取数据位置 $json_message=$this->redis->zrevrange(&#039;groupChatMessage_&#039;.$groupChatID, $start, $stop); if($json_message)  return $json_message; return false; } /** * @desc 加锁方法 * * @param $lockName string | 锁的名字 * @param $timeout int | 锁的过期时间 * * @return 成功返回identifier/失败返回false */ public function getLock($lockName, $timeout=2) { $identifier=uniqid(); #获取唯一标识符 $timeout=ceil($timeout); #确保是整数 $end=time()+$timeout; while(time()<$end)  #循环获取锁 {  /*  #这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯  if($this->redis->set($lockName, $identifier array(&#039;nx&#039;, &#039;ex&#039;=>$timeout)))  return $identifier;  */  if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上锁  {  $this->redis->expire($lockName, $timeout); #为$lockName设置过期时间  return $identifier;    #返回一维标识符  }  elseif ($this->redis->ttl($lockName)===-1)   {  $this->redis->expire($lockName, $timeout); #检测是否有设置过期时间,没有则加上  }  usleep(0.001);  #停止0.001ms } return false; } /** * @desc 释放锁 * * @param $lockName string | 锁名 * @param $identifier string | 锁的唯一值 * * @param bool */ public function releaseLock($lockName,$identifier) { if($this->redis->get($lockName)==$identifier) #判断是锁有没有被其他客户端修改 {   $this->redis->multi();  $this->redis->del($lockName); #释放锁  $this->redis->exec();  return true; } else {  return false; #其他客户端修改了锁,不能删除别人的锁 } }}?>

测试:


1、建立createGroupChat.php(测试创建群组功能)


执行代码并创建568、569群组(群主为jack)



include &#039;./ManyPullMessage.class.php&#039;;$object=new ManyPullMessage(&#039;192.168.95.11&#039;);#创建群组$user=&#039;jack&#039;;$arr=array(&#039;jane1&#039;,&#039;jane2&#039;);$a=$object->createGroupChat($user,$arr);echo "
";print_r($a);echo "
";die;

2、建立addMembers.php(测试添加成员功能)


执行代码并添加新成员


 include &#039;./ManyPullMessage.class.php&#039;; $object=new ManyPullMessage(&#039;192.168.95.11&#039;); $b=$object->addMembers(&#039;jack&#039;,&#039;568&#039;,array(&#039;jane1&#039;,&#039;jane2&#039;,&#039;jane3&#039;,&#039;jane4&#039;)); echo "
"; print_r($b); echo "
";die;

3、建立delete.php(测试群主删除成员功能)


include &#039;./ManyPullMessage.class.php&#039;;$object=new ManyPullMessage(&#039;192.168.95.11&#039;);#群主删除成员$c=$object->delMembers(&#039;jack&#039;, &#039;568&#039;, array(&#039;jane1&#039;,&#039;jane4&#039;));echo "
";print_r($c);echo "
";die;

4、建立sendMessage.php(测试发送消息功能)


多执行几遍,568、569都发几条


include &#039;./ManyPullMessage.class.php&#039;;$object=new ManyPullMessage(&#039;192.168.95.11&#039;);#发送消息$user=&#039;jane2&#039;;$message=&#039;go go go&#039;;$groupChatID=568;$arr=array(&#039;sender&#039;=>$user, &#039;message&#039;=>$message, &#039;time&#039;=>time());$d=$object->sendMessage($user,$groupChatID,$arr);echo "
";print_r($d);echo "
";die;

5、建立getNewMessage.php(测试用户获取新消息功能)


include &#039;./ManyPullMessage.class.php&#039;;$object=new ManyPullMessage(&#039;192.168.95.11&#039;);#用户获取新消息$e=$object->getNewMessage(&#039;jane2&#039;);echo "
";print_r($e);echo "
";die;

6、建立getPartMessage.php(测试用户获取某个群组部分消息)


(多发送几条消息,用于测试。568中共18条数据)


include &#039;./ManyPullMessage.class.php&#039;;$object=new ManyPullMessage(&#039;192.168.95.11&#039;);#用户获取某个群组部分消息$f=$object->getPartMessage(&#039;jane2&#039;, 568, 1, 10); echo "
";print_r($f);echo "
";die;

page=1,size=10


page=2,size=10


测试完毕,还需要别的功能可以自己进行修改添加测试。


以上就是php中Redis的应用-消息传递的示例代码分享(图)的详细内容,更多请关注gaodaima其它相关文章!




推荐阅读
  • 本文详细探讨了 Java 中 Daemon 线程的特点及其应用场景,并深入分析了 Random 类的源代码,帮助开发者更好地理解和使用这些核心组件。 ... [详细]
  • 本文详细记录了《PHP与MySQL Web开发》第一章的学习心得,特别关注了PHP的基本构成元素、标记风格、编程注意事项及表单处理技巧等内容。 ... [详细]
  • scrapyredis分布式爬虫 ... [详细]
  • 深入理解Redis集群机制
    本文旨在深入探讨Redis集群的工作原理,包括其架构设计、数据分布策略、节点通信及故障恢复机制等方面的内容。 ... [详细]
  • Node.js中子进程的创建与管理详解
    本文深入探讨了Node.js中如何使用child_process模块来创建和管理子进程,包括exec、spawn和fork三种方法的具体应用及其实现细节。 ... [详细]
  • CGroups: 资源管理和控制
    CGroups(Control Groups)是Linux内核提供的一个功能,旨在限制、记录和隔离进程组使用的物理资源,如CPU、内存和I/O等。它通过精细的资源管理,支持现代容器技术如Docker的资源限制需求。 ... [详细]
  • 深入解析Java中的锁类型及其应用场景
    本文详细介绍了Java中常见的锁类型,包括乐观锁与悲观锁、独占锁与共享锁、互斥锁与读写锁、可重入锁、公平锁与非公平锁、分段锁、偏向锁、轻量级锁、重量级锁以及自旋锁。每种锁的特性、作用及适用场景均有所涉及。 ... [详细]
  • 软件项目管理实践——Jenkins持续集成与Maven、GitHub的应用
    本文详细介绍了如何利用Jenkins实现持续集成,以及Maven和GitHub在项目开发中的具体应用。包括环境搭建、工具安装、项目创建及代码管理等关键步骤。 ... [详细]
  • 本文详细介绍了在Hive中创建表的基本语法,包括临时表、外部表的创建方法,以及如何设置表的各种属性和约束条件。 ... [详细]
  • 解决fetch上传图片至微信公众号H5页面的问题
    在近期的一个项目需求中,需要在微信公众号内嵌入H5页面,并实现用户通过该页面上传图片的功能,包括拍摄新照片或从已有相册中选择。前端开发中采用了fetch API进行接口调用,但遇到了上传图片时数据无法正确传递的问题。 ... [详细]
  • Kafka组件详解及工作原理
    本文介绍了Apache Kafka的核心组件及其工作方式,包括生产者(Producer)、消费者(Consumer)、主题(Topic)、代理(Broker)、分区(Partition)、消费者组(Consumer Group)和偏移量(Offset),并探讨了这些组件之间的交互机制。 ... [详细]
  • 深入理解FastDFS
    FastDFS是一款高效、简洁的分布式文件系统,广泛应用于互联网应用中,用于处理大量用户上传的文件,如图片、视频等。本文探讨了FastDFS的设计理念及其如何通过独特的架构设计提高性能和可靠性。 ... [详细]
  • Redis 教程01 —— 如何安装 Redis
    本文介绍了 Redis,这是一个由 Salvatore Sanfilippo 开发的键值存储系统。Redis 是一款开源且高性能的数据库,支持多种数据结构存储,并提供了丰富的功能和特性。 ... [详细]
  • 本文详细介绍了Redis中对象的内部结构,包括数据类型、编码方式、最近访问时间(LRU)和引用计数等关键属性。通过这些属性,Redis能够高效地管理和优化内存使用。 ... [详细]
  • 微服务架构详解及其入门指南
    本文详细介绍了微服务的基本概念、发展历程、与传统架构的区别及优势,并探讨了适合采用微服务架构的场景。此外,文章还深入分析了几个主流的微服务开发框架,特别是Spring Cloud的组成和特点。 ... [详细]
author-avatar
可卡因
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有