一、前言
之前写过一篇 Spring 集成 WebSocket 协议的文章 —— Spring消息之WebSocket ,所以对于 WebSocket 协议的介绍就不多说了,可以参考这篇文章。这里只做一些补充说明。另外,Netty 对 WebSocket 协议的支持要比 Spring 好太多了,用起来舒服的多。
WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。
由 IETF 发布的 WebSocket RFC,定义了 6 种帧, Netty 为它们每种都提供了一个 POJO 实现。下表列出了这些帧类型,并描述了它们的用法。
二、聊天室功能说明
1、A、B、C 等所有用户都可以加入同一个聊天室。
2、A 发送的消息,B、C 可以同时收到,但是 A 收不到自己发送的消息。
3、当用户长时间没有发送消息,系统将把他踢出聊天室。
三、聊天室功能实现
1、Netty 版本
<dependency> <groupId>io.nettygroupId> <artifactId>netty-allartifactId> <version>5.0.0.Alpha2version> dependency>
2、处理 HTTP 协议的 ChannelHandler —— 非 WebSocket 协议的请求,返回 index.html 页面
public class HttpRequestHandler extends SimpleChannelInboundHandler{ private final String wsUri; private static File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { e.printStackTrace(); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 如果请求了Websocket,协议升级,增加引用计数(调用retain()),并将他传递给下一个 ChannelHandler // 之所以需要调用 retain() 方法,是因为调用 channelRead() 之后,资源会被 release() 方法释放掉,需要调用 retain() 保留资源 if (wsUri.equalsIgnoreCase(request.uri())) { ctx.fireChannelRead(request.retain()); } else { //处理 100 Continue 请求以符合 HTTP 1.1 规范 if (HttpHeaderUtil.is100ContinueExpected(request)) { send100Continue(ctx); } // 读取 index.html RandomAccessFile randomAccessFile = new RandomAccessFile(INDEX, "r"); HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK); HttpHeaders headers = response.headers(); //在该 HTTP 头信息被设置以后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端 headers.set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); boolean keepAlive = HttpHeaderUtil.isKeepAlive(request); if (keepAlive) { headers.setLong(HttpHeaderNames.CONTENT_LENGTH, randomAccessFile.length()); headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } ctx.write(response); //将 index.html 写给客户端 if (ctx.pipeline().get(SslHandler.class) == null) { ctx.write(new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length())); } else { ctx.write(new ChunkedNioFile(randomAccessFile.getChannel())); } //写 LastHttpContent 并冲刷至客户端,标记响应的结束 ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive) { channelFuture.addListener(ChannelFutureListener.CLOSE); } } } private void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
3、处理 WebSocket 协议的 ChannelHandler —— 处理 TextWebSocketFrame 的消息帧
/** * WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧 */ public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler{ private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group) { this.group = group; } @Override protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端 Channel channel = ctx.channel(); //自己发送的消息不返回给自己 group.remove(channel); group.writeAndFlush(msg.retain()); group.add(channel); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //是否握手成功,升级为 Websocket 协议 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息 // 并把握手成功的 Channel 加入到 ChannelGroup 中 ctx.pipeline().remove(HttpRequestHandler.class); group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined")); group.add(ctx.channel()); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { group.remove(ctx.channel()); ctx.writeAndFlush(new TextWebSocketFrame("由于您长时间不在线,系统已自动把你踢下线!")).addListener(ChannelFutureListener.CLOSE); } } else { super.userEventTriggered(ctx, evt); } } }
WebSocket 协议升级完成之后, WebSocketServerProtocolHandler 将会把 HttpRequestDecoder 替换为 WebSocketFrameDecoder,把 HttpResponseEncoder 替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被 WebSocket 连接所需要的 ChannelHandler。这也包括了 HttpObjectAggregator 和 HttpRequestHandler 。
4、ChatServerInitializer —— 多个 ChannelHandler 合并成 ChannelPipeline 链
public class ChatServerInitializer extends ChannelInitializer{ private final ChannelGroup group; private static final int READ_IDLE_TIME_OUT = 60; // 读超时 private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时 private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时 public ChatServerInitializer(ChannelGroup group) { this.group = group; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // 处理那些不发送到 /ws URI的请求 pipeline.addLast(new HttpRequestHandler("/ws")); // 如果被请求的端点是 "/ws",则处理该升级握手 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // //当连接在60秒内没有接收到消息时,进会触发一个 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法处理 pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS)); pipeline.addLast(new TextWebSocketFrameHandler(group)); } }
tips:上面这些开箱即用 ChannelHandler 的作用,我就不一一介绍了,可以参考上一篇文章。
5、引导类 ChatServer
public class ChatServer { private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address) { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer(channelGroup)); ChannelFuture channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); return channelFuture; } public void destroy() { if (channel != null) { channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) { final ChatServer chatServer = new ChatServer(); ChannelFuture channelFuture = chatServer.start(new InetSocketAddress(9999)); // 返回与当前Java应用程序关联的运行时对象 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { chatServer.destroy(); } }); channelFuture.channel().closeFuture().syncUninterruptibly(); } }
三、效果展示
在浏览器中输入 http://127.0.0.1:9999 即可看到预先准备好的 index.html 页面;访问 ws://127.0.0.1:9999/ws (可随意找一个 WebSocket 测试工具测试)即可加入聊天室。
有点 low 的聊天室总算是完成了,算是 Netty 对 HTTP 协议和 WebSocket 协议的一次实践吧!虽然功能欠缺,但千里之行,始于足下!不积硅步,无以至千里;不积小流,无以成江海!
参考资料:《Netty IN ACTION》
演示源代码:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/chatroom