作者:常依sunrise | 来源:互联网 | 2023-09-11 12:25
上个版本只是为了实现简单的远程调用,只能请求指定类,并且返回字符串。本次优化,使得可以请求服务端开放的任意服务,并且返回任意对象。
优化请求信息,请求信息中携带消息id(作用很大,稍后会有介绍)、接口名、方法名、参数信息等。响应信息中有消息id、响应信息等。对象序列化方式暂时采用jdk自带序列化,因序列化不影响使用,因此暂不优化,以后版本提供扩展入口。
优化思路图
项目结构
- dubbo-server: 服务端
- dubbo-client: 消费端
- dubbo-server-service: 接口信息,消费端引入此模块
- dubbo-framework: dubbo框架模块。
核心代码
因请求和响应传输的对象不同。因此定义为request/response两个对象名称为DubboRequest
和DubboResponse
。
每次请求都需要有一个id,也许这个id不需要全局唯一,但是在同一时间,不能存在同样id的请求,这个原因,我会在下一篇文章种重点说明。这里采用Atomic
的getAndIncrement
方法获取id,主要是为了即使在多线程的情况下,也可以安全的获取到唯一的标识,这里不了解Atomic包下类作用的,可以自行百度下。
public class DubboRequest implements Serializable
{private static final AtomicLong INVOKE_ID = new AtomicLong(0);private final long mId;private Object mData;public DubboRequest (){this.mId = newId();}private static final long newId (){return INVOKE_ID.getAndIncrement();}
}
在DubboRequest的mData中存放RpcInvocation
对象。这个对象的名字也很有深意。有兴趣的小伙伴可以百科下Invacation模型。
public class RpcInvocation implements Serializable
{private static final long serialVersionUID &#61; -4355285085441097045L;private String interfacePath;private String methodName;private Class<?>[] parameterTypes;private Object[] arguments;private Map<String, String> attachments;
}
public class DubboResponse implements Serializable
{private long mId;private Object mData;
}
为了容易理解&#xff0c;协议的信息&#xff0c;还继续采用简单方式。一个Int类型的值存放总数据大小&#xff0c;然后后面是字节数组。
DubboClientEncoder.java编码代码。
public class DubboClientEncoder extends MessageToByteEncoder<DubboRequest>
{&#64;Overrideprotected void encode(ChannelHandlerContext ctx, DubboRequest req, ByteBuf out) throws Exception {byte[] serialize &#61; SerializeUtil.serialize (req);out.writeInt(serialize.length);out.writeBytes(serialize);}
}
服务开放方式&#xff0c;为了使得协议更加容易理解&#xff0c;暂时使用简陋的方式&#xff0c;简单存放在map中。
private static final Map<String,Class> exportServiceMap &#61; new HashMap<> ();static {exportServiceMap.put ("com.test.dubbo.service.UserService",UserServiceImpl.class);exportServiceMap.put ("com.test.dubbo.service.OrderService", OrderServiceImpl.class);}
服务端接收到请求信息时&#xff0c;对请求信息进行解码&#xff0c;并且找到对应的接口调用并且返回。
&#64;Overrideprotected void channelRead0 (ChannelHandlerContext ctx, DubboRequest req)throws Exception{DubboResponse rsp &#61; new DubboResponse ();RpcInvocation rpc &#61;(RpcInvocation) req.getMData ();rsp.setMId (req.getMId ());Class aClass &#61; exportServiceMap.get (rpc.getInterfacePath ());if(aClass!&#61;null){Method method &#61; aClass.getMethod (rpc.getMethodName (), rpc.getParameterTypes ());Object invoke &#61; method.invoke (aClass.newInstance (), rpc.getArguments ());rsp.setMData (invoke);}ctx.writeAndFlush (rsp);}
客户端调用代码&#xff0c;调用服务端的两个接口&#xff0c;响应结果打印。
public static void main (String[] args){RpcProxy rpcProxy &#61; new RpcProxy();UserService bean &#61; rpcProxy.getBean (UserService.class);User user &#61; bean.getUserNameById (12l);System.out.println ("userName:"&#43;user.getName ());OrderService orderService &#61; rpcProxy.getBean (OrderService.class);List<Order> orderList &#61; orderService.getOrderList ();System.out.println ("orders:"&#43;orderList);}
如此一来&#xff0c;通过RpcInvocation更加深入了解了Invocation的作用。
源码地址&#xff1a;gitee