请详细看代码中的注释
一、话不多说上时序图
二、分析解释
Application:就是我们在Debug代码的入口,本地可以从:org.apache.dubbo.demo.provider.Application进来,不过多讲解
ServiceConfig:这个是暴露服务的核心类,首先看export()
/**
* 暴露服务
*/
public synchronized void export() {
//校验并且更新替换配置
checkAndUpdateSubConfigs();
//从providerConfig中获取是否已经暴露过
if (!shouldExport()) {
return;
}
//是否延迟暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
//进入暴露
doExport();
}
}
解释:注释很清楚,主要是
校验并更新替换配置
判断是否延迟暴露,进入doExport()-->doExportUrls()
我们来看一下doExportUrls()方法:
private void doExportUrls() {
//加载注册中心URL;true:是服务提供者
List
// 循环协议暴露
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//根据协议暴露
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
解释:主要是
获取注册中心URL,组装服务提供者URL
循环协议暴露服务
我们来看doExportUrlsFor1Protocol(protocolConfig,registryURLs)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
//协议名
String name = protocolConfig.getName();
//缺省dubbo
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
//--------------此处省去一大片--------
// export service
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
//本地暴露
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
//远程暴露
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//加载监控中心的URL
URL mOnitorUrl= loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//ProxyFactory创建Invoker对象
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//protocol暴露Invoker
Exporter> exporter = protocol.export(wrapperInvoker);
//添加到暴露服务集合
exporters.add(exporter);
}
} else {
//其它情况是直连的情况,不要用于生产
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
解释:这么一大溜子代码,我只讲暴露,主要看scope参数,分为三种情况
1、none:不暴露,这个、、、、一般没有
2、local:本地暴露自己看
3、remote:远程暴露--我们的重点(分为注册中心暴露,直连)
1、循环注册中心
2、proxyFactory创建Invoker对象
3、Protocol暴露Invoker
4、将暴露的服务添加到暴露服务集合
我们进入到65行
RegistryProtocol:protocol暴露Invoker----》protocol.export(wrapperInvoker)
public
//获取注册中心URL
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
//获取服务提供者URL
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
//本地暴露
final ExporterChangeableWrapper
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
//是否将自己注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
//将自己注册到注册中心
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//向注册中心注册服务提供者URl
exporter.setRegisterUrl(registeredProviderUrl);
//向注册中心提交订阅配置规则
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
解释主要干了这么些
获取注册中心URL
获取服务提供者的URL(自己的)
本地暴露
将自己注册到注册中心
向注册中心,注册服务提供者URL
向注册中心订阅配置规则
接下来看19行本地暴露doLocalExport()
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter
});
}
进入到DubboProtocol
DubboProtocol
public
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//启动服务器
openServer(url);
//初始化序列化优化器
optimizeSerialization(url);
return exporter;
}
我们主要看25行启动服务器openServer(url)
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
解释:看服务是否启动,启动了:重启,没启动进入12行createServer(url)
createServer(url):创建一个服务器
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
// 当服务关闭时发送只读时间
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
//默认启动心跳
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//启动服务器
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
主要看19行Exchangers.bind(url,requestHandler)
requestHandler:这个主要是封装了自己的ExchangeHandler,主要做用户处理请求、消息、链接、链接断开等操作
接下来进入Exchangers
Exchangers:信息交换层
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
Transporters:数据传输层,这里主要看Netty4
/**
* 绑定一个服务器,静态方法
* @param url
* @param handlers
* @return
* @throws RemotingException
*/
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
跟进去就是NettyServer->AbstractServer()->doOpen();剩下的基本就是服务启动那一套
总结:整个过程就是服务暴露,总体感觉没什么用呢,只是看到了表象,层次分的比较好,这种上层抽象,然后特色业务各个封装,原谅我没get到提升的点