热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty协议应用和设计

一、协议的重要性客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。redis的协议redis协议遵循的规则是位数-命令,位数-命令,位

一、协议的重要性

客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。

redis的协议

redis协议遵循的规则是 位数-命令,位数-命令,位数-命令....

例子:

package com.test.netty.c7; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @Slf4j public class RedisClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); //按照redis的协议 final byte[] LINE = {'\r', '\n'}; try { ChannelFuture cOnnect= new Bootstrap().group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf =ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$4".getBytes()); buf.writeBytes(LINE); buf.writeBytes("name".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$5".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbbbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug(((ByteBuf) msg).toString(Charset.defaultCharset())); } }).connect(new InetSocketAddress("localhost", 6379)); connect.sync(); connect.channel().close().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }

写完之后,在redis端查询"name":

HTTP协议

HTTP协议相对比较复杂,Netty已经共了服务器端编码和解码的工具类HttpServerCodec来处理HTTP请求。

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器 public final class HttpServerCodec extends CombinedChannelDuplexHandler implements HttpServerUpgradeHandler.SourceCodec

服务器端代码:

package com.test.netty.c7; import com.test.nio.c3.block.Server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; @Slf4j public class HttpServer { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { log.debug("请求地址:{}",msg.uri()); //设置响应内容,版本、状态码 DefaultFullHttpResponse respOnse= new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); //设置响应内容 //设置长度,否则浏览器一直在读取中 byte[] bytes = "

Hello,MG!

".getBytes(StandardCharsets.UTF_8); response.headers().setInt(CONTENT_LENGTH, bytes.length); //设置响应体 response.content().writeBytes(bytes); //写会响应 ctx.writeAndFlush(response); } }); } }).bind(8080); } }

浏览器访问:

二、自定义协议

自定义协议的要素

  • 魔术:协议双方约定的开头,这样双方能够第一时间知道是否可以解析传递过来的消息
  • 版本号:协议的版本
  • 序列化算法:用什么方式类序列化传递的信息
  • 指令类型:跟业务相关的操作
  • 请求序号:为了双工通信,提供异步的能力
  • 正文长度:正文的长度
  • 消息正文:消息体

实例:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.List; @Slf4j public class MessageCodec extends ByteToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { //设置魔数 4个字节 out.writeBytes(new byte[]{'m', 'g', '2' ,'1'}); //设置版本号 out.writeByte(1); //设置序列化方式 out.writeByte(1); //设置指令类型 - 业务 out.writeByte(msg.getMessageType()); //设置请求序号 out.writeInt(msg.getSequenceId()); //补齐 out.writeByte(0xff); //序列化 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); objectOutputStream.writeObject(msg); byte[] bytes = byteArrayOutputStream.toByteArray(); //获得并设置政委长度 out.writeInt(bytes.length); out.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { //魔数 int magicNum = in.readInt(); //版本号 byte version = in.readByte(); //序列化方式 byte seqType = in.readByte(); //指令类型 byte messageType = in.readByte(); //获取请求编号 int sequenceId = in.readInt(); //补齐 in.readByte(); //获取正文长度 int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); out.add(message); log.debug("魔数 {}", magicNum); log.debug("版本 {}", version); log.debug("序列化方式 {}", seqType); log.debug("消息类型 {}", messageType); log.debug("请求序列 {}", sequenceId); log.debug("长度 {}", length); log.debug("消息 {}", message); } }

  • 自定义编码解码handler,实现了 ByteToMessage 接口
  • 在对应的编码、解码方法里面,按照之前介绍的内容进行按顺序进行编写即可

测试类:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class TestCodec { public static void main(String[] args) throws Exception{ EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), new LoggingHandler(LogLevel.DEBUG), new MessageCodec() ); LoginRequestMessage loginRequestMessage = new LoginRequestMessage("liming", "2292123a"); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, loginRequestMessage, byteBuf); channel.writeInbound(byteBuf); } }

运行结果:

 三、Sharable 注解

为了提高handler的使用率,netty使用了注解@Sharable注解对handler进行标识,其实就是一个handler是否是线程安全的(也就是无状态的),如果是线程安全的就可以使用@Sharable进行标识,表明这个handler可以共用,否则每次都需要new 一个新的handler。

之前写的 MessageCodec handler理论上市线程安全的,但是它的父类是线程不安全的,所以使用

@ChannelHandler.Sharable public class MessageSharableCodec extends MessageToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, List out) throws Exception { ... } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { ... } }

MessageToMessageCodec 这个父类即可。


推荐阅读
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
  • 本文详细探讨了Java中的ClassLoader类加载器的工作原理,包括其如何将class文件加载至JVM中,以及JVM启动时的动态加载策略。文章还介绍了JVM内置的三种类加载器及其工作方式,并解释了类加载器的继承关系和双亲委托机制。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • 深入解析ESFramework中的AgileTcp组件
    本文详细介绍了ESFramework框架中AgileTcp组件的设计与实现。AgileTcp是ESFramework提供的ITcp接口的高效实现,旨在优化TCP通信的性能和结构清晰度。 ... [详细]
  • 2017-2018年度《网络编程与安全》第五次实验报告
    本报告详细记录了2017-2018学年《网络编程与安全》课程第五次实验的具体内容、实验过程、遇到的问题及解决方案。 ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍如何使用Samba软件配置CIFS文件共享服务,涵盖安装、配置、权限管理及多用户挂载等关键步骤。通过具体示例和命令行操作,帮助读者快速搭建并优化Samba服务器。 ... [详细]
  • 不确定性|放入_华为机试题 HJ9提取不重复的整数
    不确定性|放入_华为机试题 HJ9提取不重复的整数 ... [详细]
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • 本文探讨了在C++中如何有效地清空输入缓冲区,确保程序只处理最近的输入并丢弃多余的输入。我们将介绍一种不阻塞的方法,并提供一个具体的实现方案。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 在高并发需求的C++项目中,我们最初选择了JsonCpp进行JSON解析和序列化。然而,在处理大数据量时,JsonCpp频繁抛出异常,尤其是在多线程环境下问题更为突出。通过分析发现,旧版本的JsonCpp存在多线程安全性和性能瓶颈。经过评估,我们最终选择了RapidJSON作为替代方案,并实现了显著的性能提升。 ... [详细]
  • 本文详细介绍了装饰者(Decorator)模式,这是一种动态地为对象添加职责的方法。与传统的继承方式不同,装饰者模式通过组合而非继承来实现功能扩展,从而提供更大的灵活性和可维护性。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
author-avatar
gogo迷失的大G
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有