热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty实现RPC的思路

RPCRPC(RemoteProcedureCall)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。
RPC

RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。

Netty实现RPC的思路
image.png

分布式组件中:外部RESTful内部RPC。

RPC调用流程

Netty实现RPC的思路
image.png
  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

通过客户端代理,实现通过netty通信,实现接口的远程调用。

code

RPC客户端

package com.pl.netty.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClient * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClient { //创建一个线程池 private static ExecutorService executor= Executors.newFixedThreadPool(5); private static NettyClientHandler client; //编写方法,使用代理模式,获取一个代理对象 public Object getBean(final Class> serviceClass, final String providerName) { //通过代理对目标对象的方法进行增强 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class>[]{serviceClass}, (proxy, method, args) -> { System.out.println("代理被调用"); if (client == null) initClient(); //方法参数 client.setPara(providerName + args[0]); return executor.submit(client).get(); }); } //初始化客户端 private static void initClient() { client = new NettyClientHandler(); //创建EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }

NettyClientHandler

package com.pl.netty.rpc.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClientHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; //上下文 private String result; //返回的结果 private String para; //客户端调用方法时,传入的参数 //与服务端创建连接后调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("通道连接成功"); cOntext= ctx; //因为我们在其他方法会使用到 ctx } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); //唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //被代理对象的调用,真正发送数据给服务器,发送完后就阻塞,等待被唤醒(channelRead) @Override public synchronized Object call() throws Exception { System.out.println("线程被调用-----"); context.writeAndFlush(para); //进行wait wait(); //等待 channelRead 获取到服务器的结果后,进行唤醒。 return result; //服务方返回的结果 } public void setPara(String para){ this.para = para; } }

ClientBootStrap

package com.pl.netty.rpc.client;

import com.pl.netty.rpc.server.HelloService;

/**
 * 

* * @Description: TODO *

* @ClassName ClientBootStrap * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class ClientBootStrap { //这里定义协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws InterruptedException { //创建一个消费者 NettyClient customer = new NettyClient(); //创建代理对象 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); //通过代理对象调用服务提供者的方法 String res = service.hello("你好 Dubbo"); System.out.println("调用的结果,res = " + res); Thread.sleep(2000); } }

RPC服务端

NettyServer

package com.pl.netty.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServer * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName, port); } private static void startServer0(String hostname, int port) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //业务处理器 } }); ChannelFuture channelFuture = serverBootstrap.bind(hostname,port).sync(); System.out.println("服务提供方开始运行"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { startServer("127.0.0.1",7000); } }

NettyServerHandler

package com.pl.netty.rpc.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServerHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息,并调用服务 System.out.println("msg=" + msg); //客户端在调用服务器的api 时,我们需要定义一个协议 //比如要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#你好" if (msg.toString().startsWith("HelloService#hello#")) { String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

HelloService

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloService * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public interface HelloService { String hello(String message); }

HelloServiceImpl

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloServiceImpl * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class HelloServiceImpl implements HelloService { @Override public String hello(String message) { System.out.println("收到客户端消息=" + message); //根据 message 返回不同的结果 if(message != null) { return "你好客户端,我已经收到你的消息【" + message + "】"; } else { return "你好客户端,我已经收到你的消息。"; } } }

输出

Netty实现RPC的思路
image.png

推荐阅读
author-avatar
手机用户2502927973
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有