首先看来自官方文档
这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。
LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。
以前自己看到上面的文字,理解不太深。随着对分布式系统的深入接触,以及DUBBO源码的研究,才有了更精确的理解。总结一句话:阅读技术类文档时,对一些语言的理解,受到阅读者自身情况的限制,往往理解有深有浅。闲话少说。本文,就是针对图中的组件,一个个进行剖析。
1.Invoker
Invoker 是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
1.1 API
public interface Invoker extends Node {
/**
* get service interface.
* @return service interface.
*/
Class getInterface();
/**
* invoke.
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
1.2 层次树结构
分析可知:主要有2个分支
1.AbstractInvoker:主要具体远程实现,和RPC 协议有关,属于dubbo-rpc-api范畴。
2.AbstractClusterInvoker:主要逻辑包括Invoker的选择,和高可用有关,属于dubbo-cluster范畴。
1.3 AbstractInvoker VS AbstractClusterInvoker
1.3.1 类图比较
通过下图可以很容易发现:AbstractClusterInvoker比较AbstractInvoker,多了些select,doselect,reselect方法。这些方法的作用也可以猜到。
1.3.2 以两个实现类对比
选择AbstractInvoker的子类DubboInvoker,
和AbstractClusterInvoker的子类FailoverClusterInvoker为代表。
其中FailoverClusterInvoker是dubbo集群容错默认方案,Dubbo协议为默认协议。
先看下AbstractInvoker分支
public abstract class AbstractInvoker implements Invoker {
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
Map context = RpcContext.getContext().getAttachments();
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
} catch (RpcException e) {
} catch (Throwable e) {
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
..
}
public class DubboInvoker extends AbstractInvoker {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
logger.debug("doInvoke start -----------------------------");
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}finally {
logger.debug("doInvoke end-----------------------------");
}
}
...
}
这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步
对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果
再看下AbstractClusterInvoker分支
public abstract class AbstractClusterInvoker implements Invoker {
public Result invoke(final Invocation invocation) throws RpcException {
checkWheatherDestoried();
LoadBalance loadbalance;
List> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List> list(Invocation invocation) throws RpcException {
List> invokers = directory.list(invocation);
return invokers;
}
protected abstract Result doInvoke(Invocation invocation, List> invokers,
LoadBalance loadbalance) throws RpcException;
..
}
public class FailoverClusterInvoker extends AbstractClusterInvoker {
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
List> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.
Set providers = new HashSet(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新检查一下
checkInvokers(copyinvokers, invocation);
}
Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
...
}
总结下AbstractClusterInvoker和AbstractClusterInvoker都需要实现Invoker接口,两者都声明了doInvoke抽象方法。但方法定义有区别。
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
protected abstract Result doInvoke(Invocation invocation, List> invokers,
LoadBalance loadbalance) throws RpcException;
AbstractClusterInvoker 需要一个Invoker列表,他们来自Directory,LoadBalance 可以可以理解为负载均衡策略。
两者的联系:AbstractClusterInvoker 比AbstractInvoker多了一些选择和负载均衡部分,到最后还是会调用AbstractInvoker分支,负责具体RPC调用工作。
1.4 常见容错方案对比
Feature
| 优点
| 缺点
| 实现类
|
Failover Cluster
| 失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用)
| 重试会带来更长延迟
| FailoverClusterInvoker |
Failfast Cluster
| 快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作
| 如果有机器正在重启,可能会出现调用失败
| FailfastClusterInvoker
|
Failsafe Cluster
| 失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作
| 调用信息丢失
| FailsafeClusterInvoker
|
Failback Cluster
| 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作
| 不可靠,重启丢失
| FailbackClusterInvoker |
Forking Cluster
| 并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作
| 需要浪费更多服务资源
| ForkingClusterInvoker |
Broadcast
Cluster | 广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态
| 速度慢,任意一台报错则报错
| BroadcastClusterInvoker |
2. 总结
*ClusterInvoker 分别使用Router,和Directory获取Invoker列表。
*ClusterInvoker然后再Invoker列表中,借助LoadBalance提供的负载均衡策略,返回一个可用的Invoker.
*Directory代表多个Invoker,可以理解为Invoker的逻辑集合,负责Invoker的下线,上线。
Router和Directory两者都对我提供Invoker列表。通过提供的API,很容易找出区别来。
//Directory
List> list(Invocation invocation) throws RpcException;
//Route
List> route(List> invokers, URL url, Invocation invocation) throws RpcException;
Directory:返回的Invoker列表,代表当前正常对外提供服务的Invoker列表。重点在维护可用节点列表。
Route:返回的Invoker列表,是在现有可用的Invoker列表里,按照规则进行再筛选,重点在规则匹配,过滤等。
本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1882850
Dubbo点滴之集群容错