热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty协议应用和设计

一、协议的重要性客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。redis的协议redis协议遵循的规则是位数-命令,位数-命令,位

一、协议的重要性

客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。

redis的协议

redis协议遵循的规则是 位数-命令,位数-命令,位数-命令....

例子:

package com.test.netty.c7; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @Slf4j public class RedisClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); //按照redis的协议 final byte[] LINE = {'\r', '\n'}; try { ChannelFuture cOnnect= new Bootstrap().group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf =ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$4".getBytes()); buf.writeBytes(LINE); buf.writeBytes("name".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$5".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbbbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug(((ByteBuf) msg).toString(Charset.defaultCharset())); } }).connect(new InetSocketAddress("localhost", 6379)); connect.sync(); connect.channel().close().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }

写完之后,在redis端查询"name":

HTTP协议

HTTP协议相对比较复杂,Netty已经共了服务器端编码和解码的工具类HttpServerCodec来处理HTTP请求。

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器 public final class HttpServerCodec extends CombinedChannelDuplexHandler implements HttpServerUpgradeHandler.SourceCodec

服务器端代码:

package com.test.netty.c7; import com.test.nio.c3.block.Server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; @Slf4j public class HttpServer { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { log.debug("请求地址:{}",msg.uri()); //设置响应内容,版本、状态码 DefaultFullHttpResponse respOnse= new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); //设置响应内容 //设置长度,否则浏览器一直在读取中 byte[] bytes = "

Hello,MG!

".getBytes(StandardCharsets.UTF_8); response.headers().setInt(CONTENT_LENGTH, bytes.length); //设置响应体 response.content().writeBytes(bytes); //写会响应 ctx.writeAndFlush(response); } }); } }).bind(8080); } }

浏览器访问:

二、自定义协议

自定义协议的要素

  • 魔术:协议双方约定的开头,这样双方能够第一时间知道是否可以解析传递过来的消息
  • 版本号:协议的版本
  • 序列化算法:用什么方式类序列化传递的信息
  • 指令类型:跟业务相关的操作
  • 请求序号:为了双工通信,提供异步的能力
  • 正文长度:正文的长度
  • 消息正文:消息体

实例:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.List; @Slf4j public class MessageCodec extends ByteToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { //设置魔数 4个字节 out.writeBytes(new byte[]{'m', 'g', '2' ,'1'}); //设置版本号 out.writeByte(1); //设置序列化方式 out.writeByte(1); //设置指令类型 - 业务 out.writeByte(msg.getMessageType()); //设置请求序号 out.writeInt(msg.getSequenceId()); //补齐 out.writeByte(0xff); //序列化 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); objectOutputStream.writeObject(msg); byte[] bytes = byteArrayOutputStream.toByteArray(); //获得并设置政委长度 out.writeInt(bytes.length); out.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { //魔数 int magicNum = in.readInt(); //版本号 byte version = in.readByte(); //序列化方式 byte seqType = in.readByte(); //指令类型 byte messageType = in.readByte(); //获取请求编号 int sequenceId = in.readInt(); //补齐 in.readByte(); //获取正文长度 int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); out.add(message); log.debug("魔数 {}", magicNum); log.debug("版本 {}", version); log.debug("序列化方式 {}", seqType); log.debug("消息类型 {}", messageType); log.debug("请求序列 {}", sequenceId); log.debug("长度 {}", length); log.debug("消息 {}", message); } }

  • 自定义编码解码handler,实现了 ByteToMessage 接口
  • 在对应的编码、解码方法里面,按照之前介绍的内容进行按顺序进行编写即可

测试类:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class TestCodec { public static void main(String[] args) throws Exception{ EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), new LoggingHandler(LogLevel.DEBUG), new MessageCodec() ); LoginRequestMessage loginRequestMessage = new LoginRequestMessage("liming", "2292123a"); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, loginRequestMessage, byteBuf); channel.writeInbound(byteBuf); } }

运行结果:

 三、Sharable 注解

为了提高handler的使用率,netty使用了注解@Sharable注解对handler进行标识,其实就是一个handler是否是线程安全的(也就是无状态的),如果是线程安全的就可以使用@Sharable进行标识,表明这个handler可以共用,否则每次都需要new 一个新的handler。

之前写的 MessageCodec handler理论上市线程安全的,但是它的父类是线程不安全的,所以使用

@ChannelHandler.Sharable public class MessageSharableCodec extends MessageToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, List out) throws Exception { ... } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { ... } }

MessageToMessageCodec 这个父类即可。


推荐阅读
author-avatar
gogo迷失的大G
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有