热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty学习之TCP粘包/拆包

一.TCP粘包拆包问题说明,如图二.未考虑TCP粘包导致功能异常案例按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间1.服务端类packag
一.TCP粘包/拆包问题说明,如图



二.未考虑TCP粘包导致功能异常案例
    按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间

1.服务端类

package com.phei.netty.s2016042302;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 服务端
* @author renhj
*
*/
public class TimeServer {

public void bind(int port) throws Exception{

//第一个用户服务器接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//第二个用户SocketChannel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建ServerBootstrap对象,启动 NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).
//设置为NIO
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务器监听端口关闭
f.channel().closeFuture().sync();

}finally{
//释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

private class ChildChannelHandler extends ChannelInitializer{

@Override
protected void initChannel(SocketChannel arg0) throws Exception {

arg0.pipeline().addLast(new TimeServerHandler());

}
}

public static void main(String[] args) throws Exception {

int port = 8080;
if(args != null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){
//采用默认值
}
}
new TimeServer().bind(port);
}

}

2.服务端核心处理类

package com.phei.netty.s2016042302;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 服务端核心处理类
* @author renhj
*
*/
public class TimeServerHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
System.out.println("The time server receive order :"+ body+" ; the counter is :"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime = currentTime +System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}





}

3.客户端类

package com.phei.netty.s2016042302;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 客服端
* @author renhj
*
*/

public class TimeClient {

public void connect(int port , String host) throws Exception{

//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer(){

@Override
public void initChannel(SocketChannel ch) throws Exception{

ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();

}finally{
//释放NIO线程
group.shutdownGracefully();
}

}
public static void main(String[] args) throws Exception{

int port = 8080;
if(args !=null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){

}
}
new TimeClient().connect(port, "127.0.0.1");
}
}


4.客户端核心处理类


package com.phei.netty.s2016042302;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 客户端核心处理类
* @author renhj
*
*/
public class TimeClientHandler extends ChannelHandlerAdapter {

private int counter;
private byte[] req;

public TimeClientHandler(){
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

ByteBuf message = null;
for(int i=0;i<100;i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is : "+body +" ; the counter is : "+ ++counter);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

//释放资源
ctx.close();
}


}

5.运行结果





三.利用LineBasedFrameDecoder+StringDecoder(换行符的方式)解决TCP粘包问题


1.服务端类


package com.phei.netty.s20160423;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 服务端
* @author renhj
*
*/
public class TimeServer {

public void bind(int port) throws Exception{

//第一个用户服务器接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//第二个用户SocketChannel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建ServerBootstrap对象,启动 NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).
//设置为NIO
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务器监听端口关闭
f.channel().closeFuture().sync();

}finally{
//释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

private class ChildChannelHandler extends ChannelInitializer{

@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//解决TCP粘包问题
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

arg0.pipeline().addLast(new TimeServerHandler());

}
}

public static void main(String[] args) throws Exception {

int port = 8080;
if(args != null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){
//采用默认值
}
}
new TimeServer().bind(port);
}

}

2.服务端核心处理类


package com.phei.netty.s20160423;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 服务端核心处理类
* @author renhj
*
*/
public class TimeServerHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String body = (String)msg;
System.out.println("The time server receive order :"+ body+" ; the counter is :"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime = currentTime +System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}





}

3.客户端类

  
package com.phei.netty.s20160423;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 客服端
* @author renhj
*
*/

public class TimeClient {

public void connect(int port , String host) throws Exception{

//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer(){

@Override
public void initChannel(SocketChannel ch) throws Exception{
//解决TCP粘包问题
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();

}finally{
//释放NIO线程
group.shutdownGracefully();
}

}
public static void main(String[] args) throws Exception{

int port = 8080;
if(args !=null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){

}
}
new TimeClient().connect(port, "127.0.0.1");
}
}

4.客户端核心处理类

package com.phei.netty.s20160423;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 客户端核心处理类
* @author renhj
*
*/
public class TimeClientHandler extends ChannelHandlerAdapter {

private int counter;
private byte[] req;

public TimeClientHandler(){
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

ByteBuf message = null;
for(int i=0;i<100;i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


String body = (String)msg;
System.out.println("Now is : "+body +" ; the counter is : "+ ++counter);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

//释放资源
ctx.close();
}


}

5.运行结果,完美解决




推荐阅读
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文详细解析了客户端与服务器之间的交互过程,重点介绍了Socket通信机制。IP地址由32位的4个8位二进制数组成,分为网络地址和主机地址两部分。通过使用 `ipconfig /all` 命令,用户可以查看详细的IP配置信息。此外,文章还介绍了如何使用 `ping` 命令测试网络连通性,例如 `ping 127.0.0.1` 可以检测本机网络是否正常。这些技术细节对于理解网络通信的基本原理具有重要意义。 ... [详细]
  • 在Linux系统中避免安装MySQL的简易指南
    在Linux系统中避免安装MySQL的简易指南 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 单片微机原理P3:80C51外部拓展系统
      外部拓展其实是个相对来说很好玩的章节,可以真正开始用单片机写程序了,比较重要的是外部存储器拓展,81C55拓展,矩阵键盘,动态显示,DAC和ADC。0.IO接口电路概念与存 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • 本文详细介绍了如何使用Python中的smtplib库来发送带有附件的邮件,并提供了完整的代码示例。作者:多测师_王sir,时间:2020年5月20日 17:24,微信:15367499889,公司:上海多测师信息有限公司。 ... [详细]
  • 解决Parallels Desktop错误15265的方法
    本文详细介绍了在使用Parallels Desktop时遇到错误15265的多种解决方案,包括检查网络连接、关闭代理服务器和修改主机文件等步骤。 ... [详细]
  • 详解 Qt 串口通信程序全程图文 (4)
    Qt串口通信程序全程图文是本文介绍的内容,本文一开始先讲解对程序的改进,在文章最后将要讲解一些重要问题。1、在窗口中加入一些组合框ComboBox&# ... [详细]
  • 解决 Windows Server 2016 网络连接问题
    本文详细介绍了如何解决 Windows Server 2016 在使用无线网络 (WLAN) 和有线网络 (以太网) 时遇到的连接问题。包括添加必要的功能和安装正确的驱动程序。 ... [详细]
  • 如何在Linux服务器上配置MySQL和Tomcat的开机自动启动
    在Linux服务器上部署Web项目时,通常需要确保MySQL和Tomcat服务能够随系统启动而自动运行。本文将详细介绍如何在Linux环境中配置MySQL和Tomcat的开机自启动,以确保服务的稳定性和可靠性。通过合理的配置,可以有效避免因服务未启动而导致的项目故障。 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
author-avatar
中医鸣芳
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有