Netty实战IM即时通讯系统(十一)pipeline与channelHandler
作者:常思 | 来源:互联网 | 2023-09-06 17:21
Netty实战IM即时通讯系统(十一)pipeline与channelHandler零、目录IM系统简介Netty简介Netty环境配置服务端启动流程客户端启动流程实战:客户端和服
Netty实战 IM即时通讯系统(十一)pipeline与channelHandler
零、 目录
- IM系统简介
- Netty 简介
- Netty 环境配置
- 服务端启动流程
- 客户端启动流程
- 实战: 客户端和服务端双向通信
- 数据传输载体ByteBuf介绍
- 客户端与服务端通信协议编解码
- 实现客户端登录
- 实现客户端与服务端收发消息
- pipeline与channelHandler
- 构建客户端与服务端pipeline
- 拆包粘包理论与解决方案
- channelHandler的生命周期
- 使用channelHandler的热插拔实现客户端身份校验
- 客户端互聊原理与实现
- 群聊的发起与通知
- 群聊的成员管理(加入与退出,获取成员列表)
- 群聊消息的收发及Netty性能优化
- 心跳与空闲检测
- 总结
- 扩展
一、 简介
- 这一小节中 , 我们来学习Netty 中一大核心组件: pipeline 与 channelHandler
- 上一小节最后 , 我们提出: 如何避免switch-case 泛滥? , 我们注意到, 不管是服务端还是客户端 , 处理流程大致分为以下步骤
- 我们把这三类逻辑都写在一个类里面, 客户端写在 ClientHandler , 服务端写在 ServerHandler , 如果要做功能的扩展 (比如 , 我们要校验魔数 , 或者其他特殊逻辑) , 只能在一个类里面去修改 , 这个类就会变得越来越臃肿。
- 另外 , 我们注意到, 每次发指令数据包都要是都调用编码器编码成byteBuf , 对于这类场景的编码优化, 我们能想到的办法自然是模块化处理 , 不同的逻辑放置到单独的类中来处理 , 最后将这些逻辑串联起来 , 形成一个完整的逻辑处理链 。
- Netty中的pipeline 与channelHandler 正是来解决这个问题的: 他通过责任链设计模式来组织代码逻辑 , 并且能够支持逻辑的动态添加和删除 , Netty能够支持各类协议的支持和扩展 , 比如: HTTP , WebSocket ,Redis 靠的就是pipeline与channelHanler。
二、 pipeline 与channelHandler 的构成
![Netty实战 IM即时通讯系统(十一)pipeline与channelHandler Netty实战 IM即时通讯系统(十一)pipeline与channelHandler](https://img2.php1.cn/3cdc5/3909/cd5/d85790914287a914.png)
- 无论是从服务端来看 , 还是从客户端来看 , 在Netty整个框架中, 一条连接对应着一个channel ,这个channel 所有的处理逻辑都在一个叫做ChannelPipeline的对象里面 , ChannelPipeline 是一个双向链表结构 , 他和Channel之间是一对一的关系
- ChannelPipeline 里面每个节点都是一个ChannelHandlerContext 对象 , 这个对象能够拿到和Channel相关的上下文信息 , 然后这个对象抱着一个重要的对象 , 那就是逻辑处理器 ChannelHandler
- 接下啦我们来看一下ChannelHandler有哪些分类
三、 ChannelHandler 的分类
![Netty实战 IM即时通讯系统(十一)pipeline与channelHandler Netty实战 IM即时通讯系统(十一)pipeline与channelHandler](https://img2.php1.cn/3cdc5/3909/cd5/55c4d55d7934b514.png)
- 可以考到ChannelHandler 有两大子接口
- 第一个子接口是ChannelInboundHandler , 从字面意思也可以猜到,他是处理读数据的逻辑 , 比如: 我们在一段读到一段数据 , 首先要解析这段数据 , 然后对这些数据做一些逻辑处理 , 最终把响应写到对端 , 在开始组装响应之前的逻辑 , 都可以放在ChannelInboundHandler 中处理 , 他的一个最重要的方法就是 channelRead , 读者可以将ChannelInboundHandler中的逻辑处理过程与TCP的七层协议的解析连接起来 , 收到的数据一层一层从物理层传送到我们的应用层 。
- 第二个子接口 ChannelOutboundHandler 是处理写数据的逻辑 , 他是定义我们一段在组装完响应之后吧数据写到对端的逻辑 ,比如我们封装好一个response对象 , 接下来我们可能对这个response做一些其他的特殊的逻辑, 然后 , 在编码成byteBuf , 最终写到对端 , 它里面最核心的一个方法就是write() , 读者可以将ChannelOutboundHandler的逻辑处理过程与TCP 的七层协议的封装过程联系起来 , 我们在应用层组装响应之后 , 通过层层协议的封装 , 直到最底层的物理层。
- 这两个子接口分别有对应的默认的实现 , ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter , 他们分别实现了两大接口的所有功能 , 默认情况下回吧读写事件传播到下一个handler。
- 说了这么多理论, 其实还是比较抽象的 , 下面我们就用一个具体的demo 来学习一下这两个handler的事件传播机制。
四、 ChannelInboundHanndler 的事件传播
-
关于ChannelInboundHandler , 我们拿 ChannelRead() 为例子 , 来体验一下inbound时间的传播
-
我们在服务端的pipeline 添加三个ChannelInboundHandler
Test_12_Server.java
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Test_11_InboundHandlerA());
ch.pipeline().addLast(new Test_11_InboundHandlerB());
ch.pipeline().addLast(new Test_11_InboundHandlerC());
}
});
-
每个inBoundHandler 继承自ChannelInboundHandlerAdapter , 然后实现了 channelRead() 方法
/**
* 2019年1月29日
* @author outman
* 服务端处理 A
*/
class Test_11_InboundHandlerA extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf)msg;
System.out.println("Test_11_InboundHandlerA --> " + buffer.toString(Charset.forName("UTF-8")));
super.channelRead(ctx, msg);
}
}
/**
* 2019年1月29日
* @author outman
* 服务端处理 B
*/
class Test_11_InboundHandlerB extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf)msg;
System.out.println("Test_11_InboundHandlerB --> " + buffer.toString(Charset.forName("UTF-8")));
super.channelRead(ctx, msg);
}
}
/**
* 2019年1月29日
* @author outman
* 服务端处理 C
*/
class Test_11_InboundHandlerC extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf)msg;
System.out.println("Test_11_InboundHandlerC --> " + buffer.toString(Charset.forName("UTF-8")));
super.channelRead(ctx, msg);
}
}
- 在channelRead() 方法里面 , 我们当前handler 的信息 , 然后调用父类的channelRead() 方法 , 而这里父类的channelRead()方法会自定调用下一个inboundHandler的channelRead() , 并且会把处理完毕的对象传递给下一个inboundHandler , 我们例子中传递的对象都是同一个msg 。
- 我们通过 addLast()方法来为pipeline添加 inboundHandler , 当然 , 除了这个方法还有其他的方法 , 感兴趣的同学可以去浏览一下pipeline的api ,这里我们添加的顺序为 A --> B --> c , 然后我们来看一下控制台输出
五、ChannelOutboundHandler 的时间传播
-
关于ChannelOutboundHandler , 我们拿write()为例子,来体验一下outbound事件的传播。
-
我们继续在服务端的pipeline添加三个ChannelOutboundHandler
Test_12_Server.java
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// inbound 服务端读数据逻辑
ch.pipeline().addLast(new Test_11_InboundHandlerA());
ch.pipeline().addLast(new Test_11_InboundHandlerB());
ch.pipeline().addLast(new Test_11_InboundHandlerC());
// outbound 服务端系数据逻辑
ch.pipeline().addLast(new Test_11_OutboundHandlerA());
ch.pipeline().addLast(new Test_11_OutboundHandlerB());
ch.pipeline().addLast(new Test_11_OutboundHandlerC());
}
});
-
每个outboundHandler都继承自ChannelOutboundHandlerAdapter , 然后实现了write()方法
/**
* 2019年2月14日
* @author outman
*
* 服务端写数据逻辑 A
*/
class Test_12_OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("Test_12_OutboundHandlerA --> " + msg);
super.write(ctx, msg, promise);
}
}
/**
* 2019年2月14日
* @author outman
*
* 服务端写数据逻辑 B
*/
class Test_12_OutboundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("Test_12_OutboundHandlerB --> " + msg);
super.write(ctx, msg, promise);
}
}
/**
* 2019年2月14日
* @author outman
*
* 服务端写数据逻辑 C
*/
class Test_12_OutboundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("Test_12_OutboundHandlerC --> " + msg);
super.write(ctx, msg, promise);
}
}
- 在write()方法里面 , 我们打印当前handler的信息,然后调用父类write()方法,而这里父类的write()方法会自动调用到下一个outboundHandler的write()方法,并且把当前outboundHandler里处理完毕的对象传递到下一个outboundHandler.
- 我们通过addLast()方法添加outboundHandler的顺序为 A -> B -> C , 最后我们来看一下控制台的输出
-
可以看到outboundHandler的执行顺序与我们添加的顺序相反 , 最后,我们在来看一下pipeline的结构和执行顺序。
- pipeline 的结构
不管我们定义的是那种类型的handler, 最终他们都以双向链表的形式连接,这里实际链表的节点是ChannelHandlerContext , 这里为了让结构清晰突出,可以直接把节点看做ChannelHandlerContext
- pipeline 的执行顺序
虽然两种类型的handler在一个双向链表里, 但是这两类handler 的分工是不一样的,inboundHandler的事件通常只会传播到下一个channelHandler,outboundHandler的事件通常通常只会传播到下一个outboundHandler , 两者互不干扰。
六 、 总结
- 通过我们前面编写客户端、服务端处理逻辑引出了pipeline和channelHandler的概念
- channelHandler分为inbound和outbound两种类型的接口 , 分别是处理数据读和数据写的逻辑
- 两种类型的handler均有相应的默认实现,默认会把事件传递到下一个 , 这里的传递事件其实说白了就是把本handler的处理结果传递给下一个handler继续处理
- inboundHandler的执行顺序与我们实际的添加顺序相同 ,而outboundHandler 相反。
七 、 思考
- 参考本文中的例子 , 如果我们往pipeline里面添加handler的顺序不变,要在控制台打印出inboundA -> inboundB -> outboundB -> outboundA , 该如何实现?
- 如何在每个handler里面打印上一个handler 的处理结束的时间点呢?
- 答: 可以将上一个handler处理结束的时间放在channel的attr中
- 如何让outboundHandler按照添加顺序执行?
- 答: pipeline有andLast() 和andFrist()方法 ,inboundHandler在执行过程中正向遍历链表 使用andLast()方法先进先出可以顺序执行 , outboundHandler在执行过程中逆向遍历链表, 使用andFrist()方法 先进后出可以顺序执行 。
- OutBoundHandler一直没有被执行到,pipeline也已添加,有可能是什么原因呢?
- 答: ctx.channel().writeAndFlush(msg); 进行写事件才会触发out调用链 。 所以需要将inboundHandlerC 中执行写逻辑 才会执行outboundHandler.
推荐阅读
-
本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ...
[详细]
蜡笔小新 2023-12-10 17:25:31
-
本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ...
[详细]
蜡笔小新 2023-12-14 18:02:45
-
-
本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ...
[详细]
蜡笔小新 2023-12-14 16:21:36
-
本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ...
[详细]
蜡笔小新 2023-12-13 17:08:24
-
本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ...
[详细]
蜡笔小新 2023-12-13 10:56:31
-
本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ...
[详细]
蜡笔小新 2023-12-12 15:58:44
-
本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ...
[详细]
蜡笔小新 2023-12-11 14:17:13
-
EzPP发布了0.2.1版本,新增了YAML布局渲染功能,可以将YAML文件渲染为图片,并且可以复用YAML作为模版,通过传递不同参数生成不同的图片。这个功能可以用于绘制Logo、封面或其他图片,让用户不需要安装或卸载Photoshop。文章还提供了一个入门例子,介绍了使用ezpp的基本渲染方法,以及如何使用canvas、text类元素、自定义字体等。 ...
[详细]
蜡笔小新 2023-12-11 12:39:10
-
本文介绍了使用圣杯布局模式实现网站首页的内容布局的方法,包括HTML部分代码和实例。同时还提供了公司新闻、最新产品、关于我们、联系我们等页面的布局示例。商品展示区包括了车里子和农家生态土鸡蛋等产品的价格信息。 ...
[详细]
蜡笔小新 2023-12-10 20:09:23
-
本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ...
[详细]
蜡笔小新 2023-12-10 15:47:11
-
本文介绍了2015年九月八日的js学习总结及相关知识点,包括参考书《javaScript Dom编程的艺术》、js简史、Dom、DHTML、解释型程序设计和编译型程序设计等内容。同时还提到了最佳实践是将标签放到HTML文档的最后,并且对语句和注释的使用进行了说明。 ...
[详细]
蜡笔小新 2023-12-10 11:10:08
-
本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ...
[详细]
蜡笔小新 2023-12-09 19:11:31
-
本文介绍了在python中安装并使用redis的相关知识,包括redis的数据缓存系统和支持的数据类型,以及在pycharm中安装redis模块和常用的字符串操作。 ...
[详细]
蜡笔小新 2023-12-09 10:31:54
-
Python已成为全球最受欢迎的编程语言之一,然而Python程序的安全运行存在一定的风险。本文介绍了Python程序安全运行需要满足的三个条件,即系统路径上的每个条目都处于安全的位置、"主脚本"所在的目录始终位于系统路径中、若python命令使用-c和-m选项,调用程序的目录也必须是安全的。同时,文章还提出了一些预防措施,如避免将下载文件夹作为当前工作目录、使用pip所在路径而不是直接使用python命令等。对于初学Python的读者来说,这些内容将有所帮助。 ...
[详细]
蜡笔小新 2023-12-09 10:20:23
-
本篇将由环境搭建、实现原理、编程开发、插件开发、编译运行、性能稳定、发展未来等七个方面,对当前的ReactNative和Flutter进行全面的分析对比, ...
[详细]
蜡笔小新 2023-10-17 11:44:05
-