热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty拆包粘包问题解决——特殊结束符

本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。

客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

服务端

server

package com.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;//服务器端
public class MyServer {//监听线程组,监听客户端请求private EventLoopGroup acceptorGroup = null;//处理客户端相关操作线程组,负责处理与客户端的数据通信private EventLoopGroup clientGroup = null;//服务启动相关配置信息,服务端Bootstrap带serverprivate ServerBootstrap serverBootstrap = null;public MyServer(){init();}private void init(){//初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量acceptorGroup = new NioEventLoopGroup();clientGroup = new NioEventLoopGroup();//初始化服务的配置serverBootstrap = new ServerBootstrap();//绑定线程组,acceptorGroup监听信息,clientGroup客户端信息serverBootstrap.group(acceptorGroup,clientGroup);//设定通信模式为NIO,同步非阻塞serverBootstrap.channel(NioServerSocketChannel.class);//设定缓冲区大小,缓冲区单位是字节serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);//SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024).option(ChannelOption.SO_RCVBUF,16*1024).option(ChannelOption.SO_KEEPALIVE,true);}public ChannelFuture doAccept(int port) throws InterruptedException {/*** childHandler是服务的bootstrap独有的方法,用于提供处理对象* 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理*/serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符,定义的数据分隔符一定是一个ByteBuf类型的数据对象ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] acceptorHandlers = new ChannelHandler[3];//处理固定结束标记符号的Handler,这个Handler没有@Sharabler注解修饰//必须每次初始化通道时创建一个新的对象//使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度,Netty建议数据有最大长度acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换成字符串acceptorHandlers[1] = new StringDecoder(Charset.forName("utf-8"));acceptorHandlers[2] = new MyServerHandler();socketChannel.pipeline().addLast(acceptorHandlers);}});/*** bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可* sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果* 可以使用ChannelFuture实现后续的服务器和客户端的交互*/ChannelFuture future = serverBootstrap.bind(port).sync();/*绑定多个端口serverBootstrap.bind(port);serverBootstrap.bind(port);*/return future;}/*** shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求*/public void release(){this.acceptorGroup.shutdownGracefully();this.clientGroup.shutdownGracefully();}public static void main(String[] args){ChannelFuture future = null;MyServer myServer = null;try {myServer = new MyServer();future = myServer.doAccept(9999);System.out.println("server started");//关闭连接future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != myServer){myServer.release();}}}
}

serverHandler

package com.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;import java.io.UnsupportedEncodingException;/*** @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用* 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象*/
@Sharable
public class MyServerHandler extends ChannelHandlerAdapter{/*** 业务处理逻辑* 用于处理读取数据请求的逻辑* ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel* msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {String message = msg.toString();System.out.println("from client :"+message);String line = "server message $E$ test delimiter handler!! $E$ second message $E$";if ("exit".equals(message)){ctx.close();return;}//写操作自动释放缓存,避免内存溢出ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));/*如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8")));ctx.close();*/}/*** 异常处理逻辑,当客户端异常退出时也会执行* ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){System.out.println("server exceptionCaught method run..");ctx.close();}
}

客户端

client

package com.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;/*** 客户端是请求的发起者,不需要监听* 只需要定义唯一的一个线程组即可*/
public class CustorClient {//处理请求和处理服务端响应的线程组private EventLoopGroup group = null;//客户端服务启动相关配置信息private Bootstrap bootstrap = null;public CustorClient(){init();}private void init(){group = new NioEventLoopGroup();bootstrap = new Bootstrap();//绑定线程组bootstrap.group(group);//设定通讯模式为NIObootstrap.channel(NioSocketChannel.class);}public ChannelFuture doRequest(String host, int port) throws InterruptedException {/*** 客户端的bootstrap没有childHandler方法,只有handler方法* 方法含义等同于ServerBootstrap中的childHandler* 在客户端必须绑定处理器(必须调用handler方法)* 服务器必须绑定处理器(必须调用childHandler方法)*/this.bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] handlers = new ChannelHandler[3];handlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器handlerhandlers[1] = new StringDecoder(Charset.forName("utf-8"));handlers[2] = new CustorClientHandler();socketChannel.pipeline().addLast(handlers);}});//建立连接ChannelFuture future = this.bootstrap.connect(host,port).sync();return future;}public void release(){this.group.shutdownGracefully();}public static void main(String[] atgs){CustorClient client = null;ChannelFuture future = null;try {client = new CustorClient();future = client.doRequest("localhost", 9999);Scanner s = null;while (true) {s = new Scanner(System.in);System.out.println("enter message send to server(enter 'exit' for close client)");String line = s.nextLine();if ("exit".equals(line)) {/*** addListener - 增加监听,当条件满足时候,出发监听器* ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接*/future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))).addListener(ChannelFutureListener.CLOSE);break;}//Unpooled工具类用来做buffer转换future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));//睡一秒读取信息TimeUnit.SECONDS.sleep(1);}}catch (Exception e){e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != client){client.release();}}}
}

clientHandler

package com.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;import java.io.UnsupportedEncodingException;public class CustorClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException {try {String message = msg.toString();System.out.println("form server:"+message);} finally {//释放资源,避免内存溢出ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){System.out.println("client exceptionCaught method run..");cxt.close();}
}

测试

 


推荐阅读
  • Netty基础教程:构建简易Netty客户端与服务器
    Java NIO是解决传统阻塞I/O问题的关键技术之一,但其复杂性给开发者带来了挑战。Netty作为一个成熟的网络编程框架,极大地简化了这一过程。本文将通过一个简单的示例,介绍如何使用Netty创建基本的客户端和服务器。 ... [详细]
  • 深入理解BIO与NIO的区别及其应用
    本文详细探讨了BIO(阻塞I/O)和NIO(非阻塞I/O)之间的主要差异,包括它们的工作原理、性能特点以及应用场景,旨在帮助开发者更好地理解和选择适合的I/O模型。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 将Web服务部署到Tomcat
    本文介绍了如何在JDeveloper 12c中创建一个Java项目,并将其打包为Web服务,然后部署到Tomcat服务器。内容涵盖从项目创建、编写Web服务代码、配置相关XML文件到最终的本地部署和验证。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • andr ... [详细]
  • 本文详细介绍了如何在Ubuntu系统中下载适用于Intel处理器的64位版本,涵盖了不同Linux发行版对64位架构的不同命名方式,并提供了具体的下载链接和步骤。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 本文详细探讨了JDBC(Java数据库连接)的内部机制,重点分析其作为服务提供者接口(SPI)框架的应用。通过类图和代码示例,展示了JDBC如何注册驱动程序、建立数据库连接以及执行SQL查询的过程。 ... [详细]
  • 本文介绍了Android开发中Intent的基本概念及其在不同Activity之间的数据传递方式,详细展示了如何通过Intent实现Activity间的跳转和数据传输。 ... [详细]
  • 开发笔记:9.八大排序
    开发笔记:9.八大排序 ... [详细]
author-avatar
琳琳小朋友m
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有