热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty实现RPC的思路

RPCRPC(RemoteProcedureCall)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。
RPC

RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。

Netty实现RPC的思路
image.png

分布式组件中:外部RESTful内部RPC。

RPC调用流程

Netty实现RPC的思路
image.png
  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

通过客户端代理,实现通过netty通信,实现接口的远程调用。

code

RPC客户端

package com.pl.netty.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClient * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClient { //创建一个线程池 private static ExecutorService executor= Executors.newFixedThreadPool(5); private static NettyClientHandler client; //编写方法,使用代理模式,获取一个代理对象 public Object getBean(final Class> serviceClass, final String providerName) { //通过代理对目标对象的方法进行增强 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class>[]{serviceClass}, (proxy, method, args) -> { System.out.println("代理被调用"); if (client == null) initClient(); //方法参数 client.setPara(providerName + args[0]); return executor.submit(client).get(); }); } //初始化客户端 private static void initClient() { client = new NettyClientHandler(); //创建EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }

NettyClientHandler

package com.pl.netty.rpc.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClientHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; //上下文 private String result; //返回的结果 private String para; //客户端调用方法时,传入的参数 //与服务端创建连接后调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("通道连接成功"); cOntext= ctx; //因为我们在其他方法会使用到 ctx } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); //唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //被代理对象的调用,真正发送数据给服务器,发送完后就阻塞,等待被唤醒(channelRead) @Override public synchronized Object call() throws Exception { System.out.println("线程被调用-----"); context.writeAndFlush(para); //进行wait wait(); //等待 channelRead 获取到服务器的结果后,进行唤醒。 return result; //服务方返回的结果 } public void setPara(String para){ this.para = para; } }

ClientBootStrap

package com.pl.netty.rpc.client;

import com.pl.netty.rpc.server.HelloService;

/**
 * 

* * @Description: TODO *

* @ClassName ClientBootStrap * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class ClientBootStrap { //这里定义协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws InterruptedException { //创建一个消费者 NettyClient customer = new NettyClient(); //创建代理对象 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); //通过代理对象调用服务提供者的方法 String res = service.hello("你好 Dubbo"); System.out.println("调用的结果,res = " + res); Thread.sleep(2000); } }

RPC服务端

NettyServer

package com.pl.netty.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServer * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName, port); } private static void startServer0(String hostname, int port) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //业务处理器 } }); ChannelFuture channelFuture = serverBootstrap.bind(hostname,port).sync(); System.out.println("服务提供方开始运行"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { startServer("127.0.0.1",7000); } }

NettyServerHandler

package com.pl.netty.rpc.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServerHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息,并调用服务 System.out.println("msg=" + msg); //客户端在调用服务器的api 时,我们需要定义一个协议 //比如要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#你好" if (msg.toString().startsWith("HelloService#hello#")) { String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

HelloService

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloService * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public interface HelloService { String hello(String message); }

HelloServiceImpl

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloServiceImpl * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class HelloServiceImpl implements HelloService { @Override public String hello(String message) { System.out.println("收到客户端消息=" + message); //根据 message 返回不同的结果 if(message != null) { return "你好客户端,我已经收到你的消息【" + message + "】"; } else { return "你好客户端,我已经收到你的消息。"; } } }

输出

Netty实现RPC的思路
image.png

推荐阅读
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 本文全面解析了 gRPC 的基础知识与高级应用,从 helloworld.proto 文件入手,详细阐述了如何定义服务接口。例如,`Greeter` 服务中的 `SayHello` 方法,该方法在客户端和服务器端的消息交互中起到了关键作用。通过实例代码,读者可以深入了解 gRPC 的工作原理及其在实际项目中的应用。 ... [详细]
  • 深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用
    深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用 ... [详细]
  • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 阿里巴巴Java后端开发面试:TCP、Netty、HashMap、并发锁与红黑树深度解析 ... [详细]
  • 如何使用 `com.amazonaws.services.sqs.model.DeleteMessageRequest` 的 `getQueueUrl()` 方法及其代码示例解析 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 如何在PHP中准确获取服务器IP地址?
    如何在PHP中准确获取服务器IP地址? ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
author-avatar
手机用户2502927973
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有