Dubbo是一个RPC开源框架,自定义Dubbo协议实现远程调用。一、Dubbo官方架构图官方给的架构图主要分为了4个版块:注册中心Registry、监视器Moni
Dubbo是一个RPC开源框架,自定义Dubbo协议实现远程调用。
一、Dubbo 官方架构图
官方给的架构图主要分为了4个版块: 注册中心Registry、监视器Monitor、服务提供者Provider、服务消费者Consumer。
二、功能解析
在学习Dubbo 源码前,需要了解Dubbo是一个用Java实现的高性能RPC框架, 主要功能包含服务注册与发现、集群容错、远程调用、负载均衡、高度可扩展、运行期流量调度、可视化的服务治理与运维等功能。
三、从消费方解析Dubbo 源码是如何实现调用
首先我们先看一下Consumer的main方法:
1. 初始化xml配置。
2. 启动Spring 容器。
3. 获取Bean对象,即远程服务对象,其实是一个代理的对象。
4. 执行服务的目标方法。
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.dubbo.demo.consumer;import org.apache.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class Consumer {/*** To get ipv6 address to work, add* System.setProperty("java.net.preferIPv6Addresses", "true");* before running your application.*/public static void main(String[] args) {// 加载配置文件ClassPathXmlApplicationContext cOntext= new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});// 启动Spring 容器, 如果调用getBean()方法,Spring 容器会默认启动context.start();// 获取代理对象DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy// 获取接口的代理对象while (true) {try {Thread.sleep(1000);// 执行代理对象里的方法String hello = demoService.sayHello("world"); // call remote methodSystem.out.println(hello); // get result} catch (Throwable throwable) {throwable.printStackTrace();}}}
}
为了方便理解,我们可以把消费者远程调用其他服务的过程拆分成以下几个步骤:
1. 准备工作。在Spring 容器启动的时候就开始准备,初始化服务、服务需要的registry、Bean、从注册中心拉取服务地址列表等。
2. 服务发现与导入, 服务生产者把服务推送到zookeeper registry,服务消费者订阅注册中心zookeeper, 从zookeeper 注册中心拉取缓存到本地。
3. 服务调用, consumer 调用远程服务, 包含服务容错机制、负载均衡策略等。
实际上,dubbo源码的执行流程比上面描述的要复杂很多,在这里我主要是为了去理解远程调用的执行流程和设计思想,先把它概括成三个步骤: 准备、服务导入和服务发现、服务调用。
Consumer 初始化
1. 初始化 consumer 配置文件,加载并解析dubbo-demo-consumer.xml配置。
首先我们可以找到加载xml配置文件的类DubboNamespaceHandler,在此类下的init()方法下打一个断点,如下图:
在此行停顿,我们可以发现用到了一个ReferenceBean的类,它其实是META-INF.spring目录下dubbo-demo-consumer.xml文件里配置的 reference:
同时在main()方法的 context.start()方法处打一个断点:
可以发现加载配置的代码是在.start()方法之前执行的,简单的说是通过new ClassPathXmlApplicationContext()方法初始化配置文件。
ClassPathXmlApplicationContext cOntext= new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
2. 初始化referenceBean流程解析
referenceBean 是远程调用服务关键的类,实现了FatoryBean 等Spring 相关的接口,用来读取并初始化标签里配置的interface等属性,referenceBean 里的执行流程可以概况为如下:
DubboNamespaceHandler:
ReferenceBean
. getObject() # 实现org.springframework.beans.factory.FactoryBean 接口里的getObject() 方法
get():
# 在init()方法里初始化consumer
init()
最终是在init()方法里初始化整个consumer的服务。
3. ReferenceConfig类里的init()方法解析
init()方法里包含很多初始化的过程,给后面要用到的变量进行赋值。例如初始化要调用的接口名,接口方法、初始化zookeeper注册中心连接、监视器等
先重点看一下此方法里初始化的一个重要的Map, 里面包含了远程接口的各种信息,如应用名称、dubbo版本、方法名称、自己配置的代理模式(Jdk动态代理)等,此map会作为创建Proxy的参数列表。
根据map创建代理对象
ref = createProxy(map);
然后使用DubboSPI 机制获取生成代理对象的工厂,Dubbo的SPI核心实现在ExtensionLoader和ExtensionDirector。
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
点击进入到Protocol接口,可以查看@SPI("dubbo"),选择到的使用的协议是dubbo协议。
继续进入到createProxy()方法里面, 可以找到 return (T) proxyFactory.getProxy(invoker) 方法处, 如果了解动态代理模式,就能很容易理解此方法的作用是通过代理工厂生成代理对象,最终将代理对象返回。
3. 深入研究创建代理对象的机制。
由上述2可以发现,获取到最终的接口对象是通过动态代理创建的,而dubbo 默认使用的代理工厂的方式是 JavassistProxyFactory
我们也可以通过Jdk动态代理工厂来生成代理对象, 在consumer的 dubbo:reference 标签里添加proxy属性为 "jdk"
配置好后,dubbo就会使用JdkProxyFactory来创建代理对象。
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.dubbo.rpc.proxy.jdk;import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.proxy.AbstractProxyFactory;
import org.apache.dubbo.rpc.proxy.AbstractProxyInvoker;
import org.apache.dubbo.rpc.proxy.InvokerInvocationHandler;import java.lang.reflect.Method;
import java.lang.reflect.Proxy;/*** JavaassistRpcProxyFactory*/
public class JdkProxyFactory extends AbstractProxyFactory {@Override@SuppressWarnings("unchecked")// 使用jdk动态d代理public T getProxy(Invoker invoker, Class>[] interfaces) {return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));}@Overridepublic Invoker getInvoker(T proxy, Class type, URL url) {return new AbstractProxyInvoker(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class>[] parameterTypes,Object[] arguments) throws Throwable {Method method = proxy.getClass().getMethod(methodName, parameterTypes);return method.invoke(proxy, arguments);}};}
}
如果不是很理解JDK动态代理,可以参考如下文章:
JDK动态代理模式详解_Dream_it_possible!的博客-CSDN博客
服务发现与导入
Directory
在AbstractClusterInvoker抽象类里维护了一个 directory目录,该目录用来维护了从zookeeper注册中心里的目录节点数据,因为本身zookeeper是一个文件系统。
Dubbo通过容错机制,再调用失败时会重新从zookeeper里拉取所有的目录节点。在FailoverClusterInvoker类的 doInvoke() 方法有个list()方法:
if (i > 0) {checkWhetherDestroyed();copyinvokers = list(invocation);// check againcheckInvokers(copyinvokers, invocation);}
上面的服务发现与导入是为了下面的服务调用做准备工作。
服务调用
dubbo远程调用的结构图:
Mock为服务容错,Cluster为集群容错。
# 最外层为MockClusterInvoker
MockClusterInvoker
# 集群容错策略: 默认为FailOverCluterInvoker
FailOverClusterInvoker
# 负载均衡策略
LoadBalance
# 服务列表
List invokers
最终是通过invoker.invoke(invocation)执行远程调用,底层是通过Netty的NIO将方法对象和Url发送到服务提供者,由服务提供者去处理方法url和方法对象。
服务容错机制
在MockClusterInvoker下一层的FailOverClusterInvoker类里能找到服务容错的方法, 如果没有配置重试次数,默认为2次,因此总共会发3次请求,如下:
继续往下看,可以发现,在此处有个for 循环,因为FailOverClusterInvoker的策略是失败重试,直到有个成功就返回!
服务容错FailOverClusterInvoker的效果演示:
负载均衡机制
在每次调用前,会使用负载均衡策略去选择一个节点进行调用,dubbo默认地负载均衡策略是随机。
Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
同样地dubbo也可以通过配置更改负载均衡策略!