热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

原文转载自「刘悦的技术博客」https:v3u.cna_id_202“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_202

“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。

为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:

1.能够实时接收来自其他客户端的信息。

2.能够将每条信息实时推送给收件人。

当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1

pip3 install tornado==6.1

随后编写程序启动文件main.py:

import tornado.httpserver
import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表
users = [] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self)# 建立torando实例 app = tornado.web.Application( [ (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()

如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。

下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py

import redis r = redis.Redis()
r.publish("test",'hello')

随后编写 client.py:

import redis
r = redis.Redis()
ps = r.pubsub()
ps.subscribe('test')
for item in ps.listen(): if item['type'] == 'message': print(item['data'])

可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:

根据发布者订阅者逻辑,改写main.py:

import tornado.httpserver
import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表
users = [] # 频道列表
channels = ["channel_1","channel_2"] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self) # 基于redis监听发布者发布消息
def redis_listener(loop): asyncio.set_event_loop(loop) async def listen(): r = redis.Redis(decode_responses=True) # 声明pubsb实例 ps = r.pubsub() # 订阅聊天室频道 ps.subscribe(["channel_1","channel_2"]) # 监听消息 for message in ps.listen(): print(message) # 遍历链接上的用户 for user in users: print(user) if message["type"] == "message" and message["channel"] == user.get_COOKIE("channel"): user.write_message(message["data"]) future = asyncio.gather(listen()) loop.run_until_complete(future) # 接口 发布信息
class Msg(tornado.web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = redis.Redis() r.publish(channel,data) return self.write("ok") # 建立torando实例 app = tornado.web.Application( [ (r'/send/',Msg), (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': loop = asyncio.new_event_loop() # 单线程启动订阅者服务 threading.Thread(target=redis_listener,args=(loop,)).start() # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()

这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端COOKIE的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。

需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:

IOLoop.current() doesn't work in non-main

这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。

下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:


这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入COOKIE,便于后端服务识别后匹配推送。

效果是这样的:

诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。

这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:

pip3 install aioredis

aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。

此时,可以新建一个异步订阅服务文件main_with_aioredis.py:

import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout

之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader:

async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub))

在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:

async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_COOKIE("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass

最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:

if __name__ == '__main__': # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()

完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:

import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout users = [] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self) class Msg(web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = await aioredis.from_url("redis://localhost", decode_responses=True) await r.publish(channel,data) return self.write("ok") async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_COOKIE("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub)) application = web.Application([ (r'/send/',Msg), (r'/wb/', WB),
],debug=True) if __name__ == '__main__': # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()

从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。

结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom

原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_202


推荐阅读
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
  • 本文详细介绍如何在VSCode中配置自定义代码片段,使其具备与IDEA相似的代码生成快捷键功能。通过具体的Java和HTML代码片段示例,展示配置步骤及效果。 ... [详细]
  • 本文详细记录了在基于Debian的Deepin 20操作系统上安装MySQL 5.7的具体步骤,包括软件包的选择、依赖项的处理及远程访问权限的配置。 ... [详细]
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • Python自动化处理:从Word文档提取内容并生成带水印的PDF
    本文介绍如何利用Python实现从特定网站下载Word文档,去除水印并添加自定义水印,最终将文档转换为PDF格式。该方法适用于批量处理和自动化需求。 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 深入理解Redis的数据结构与对象系统
    本文详细探讨了Redis中的数据结构和对象系统的实现,包括字符串、列表、集合、哈希表和有序集合等五种核心对象类型,以及它们所使用的底层数据结构。通过分析源码和相关文献,帮助读者更好地理解Redis的设计原理。 ... [详细]
  • 本文将介绍如何使用 Go 语言编写和运行一个简单的“Hello, World!”程序。内容涵盖开发环境配置、代码结构解析及执行步骤。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 360SRC安全应急响应:从漏洞提交到修复的全过程
    本文详细介绍了360SRC平台处理一起关键安全事件的过程,涵盖从漏洞提交、验证、排查到最终修复的各个环节。通过这一案例,展示了360在安全应急响应方面的专业能力和严谨态度。 ... [详细]
  • 本文介绍如何通过Windows批处理脚本定期检查并重启Java应用程序,确保其持续稳定运行。脚本每30分钟检查一次,并在需要时重启Java程序。同时,它会将任务结果发送到Redis。 ... [详细]
  • 本文详细介绍了Python编程语言的学习路径,涵盖基础语法、常用组件、开发工具、数据库管理、Web服务开发、大数据分析、人工智能、爬虫开发及办公自动化等多个方向。通过系统化的学习计划,帮助初学者快速掌握Python的核心技能。 ... [详细]
author-avatar
禎冬魔_784
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有