热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Dubbo源码解析服务提供者是怎么暴露服务的?

请详细看代码中的注释一、话不多说上时序图二、分析解释Application:就是我们在D

请详细看代码中的注释

一、话不多说上时序图

二、分析解释

  1. Application:就是我们在Debug代码的入口,本地可以从:org.apache.dubbo.demo.provider.Application进来,不过多讲解

  2. ServiceConfig:这个是暴露服务的核心类,首先看export()

      /**
      * 暴露服务
      */
      public synchronized void export() {
            //校验并且更新替换配置
      checkAndUpdateSubConfigs();
      //从providerConfig中获取是否已经暴露过
      if (!shouldExport()) {
      return;
      }
      //是否延迟暴露
      if (shouldDelay()) {
      DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
      } else {
      //进入暴露
      doExport();
      }
      }

    1. 解释:注释很清楚,主要是

      1. 校验并更新替换配置

      2. 判断是否延迟暴露,进入doExport()-->doExportUrls()

    2. 我们来看一下doExportUrls()方法:

        private void doExportUrls() {
        //加载注册中心URL;true:是服务提供者
        List registryURLs = loadRegistries(true);
        // 循环协议暴露
        for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        //根据协议暴露
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
        }

      1. 解释:主要是

        1. 获取注册中心URL,组装服务提供者URL

        2. 循环协议暴露服务

    3. 我们来看doExportUrlsFor1Protocol(protocolConfig,registryURLs)

      private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
      //协议名
      String name = protocolConfig.getName();
      //缺省dubbo
      if (StringUtils.isEmpty(name)) {
      name = DUBBO;
      }


              //--------------此处省去一大片--------



      // export service
      String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
      Integer port = this.findConfigedPorts(protocolConfig, name, map);
      URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);


      if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
      .hasExtension(url.getProtocol())) {
      url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
      .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
      }


      String scope = url.getParameter(SCOPE_KEY);
      // don't export when none is configured
      if (!SCOPE_NONE.equalsIgnoreCase(scope)) {


      // export to local if the config is not remote (export to remote only when config is remote)
      //本地暴露
      if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
      exportLocal(url);
      }
      // export to remote if the config is not local (export to local only when config is local)
      //远程暴露
      if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
      if (CollectionUtils.isNotEmpty(registryURLs)) {
      for (URL registryURL : registryURLs) {
      //if protocol is only injvm ,not register
      if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
      continue;
      }
      // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
      url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
      //加载监控中心的URL
      URL mOnitorUrl= loadMonitor(registryURL);
      if (monitorUrl != null) {
      url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
      }
      if (logger.isInfoEnabled()) {
      if (url.getParameter(REGISTER_KEY, true)) {
      logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
      } else {
      logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
      }
      }


      // For providers, this is used to enable custom proxy to generate invoker
      String proxy = url.getParameter(PROXY_KEY);
      if (StringUtils.isNotEmpty(proxy)) {
      registryURL = registryURL.addParameter(PROXY_KEY, proxy);
      }
      //ProxyFactory创建Invoker对象
      Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
      DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
      //protocol暴露Invoker
      Exporter exporter = protocol.export(wrapperInvoker);
      //添加到暴露服务集合
      exporters.add(exporter);
      }
      } else {
      //其它情况是直连的情况,不要用于生产
      if (logger.isInfoEnabled()) {
      logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
      }
      Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
      DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);


      Exporter exporter = protocol.export(wrapperInvoker);
      exporters.add(exporter);
      }
      /**
      * @since 2.7.0
      * ServiceData Store
      */
      MetadataReportService metadataReportService = null;
      if ((metadataReportService = getMetadataReportService()) != null) {
      metadataReportService.publishProvider(url);
      }
      }
      }
      this.urls.add(url);
      }

      解释:这么一大溜子代码,我只讲暴露,主要看scope参数,分为三种情况

      1、none:不暴露,这个、、、、一般没有

      2、local:本地暴露自己看

      3、remote:远程暴露--我们的重点(分为注册中心暴露,直连)

          1、循环注册中心

          2、proxyFactory创建Invoker对象

          3、Protocol暴露Invoker

          4、将暴露的服务添加到暴露服务集合

      我们进入到65行


  3. RegistryProtocol:protocol暴露Invoker----》protocol.export(wrapperInvoker)

      public Exporter export(final Invoker originInvoker) throws RpcException {
      //获取注册中心URL
      URL registryUrl = getRegistryUrl(originInvoker);
      // url to export locally
      //获取服务提供者URL
      URL providerUrl = getProviderUrl(originInvoker);


      // Subscribe the override data
      // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
      // the same service. Because the subscribed is cached key with the name of the service, it causes the
      // subscription information to cover.
      final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
      final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
      overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


      providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
      //export invoker
      //本地暴露
      final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);


      // url to registry
      final Registry registry = getRegistry(originInvoker);
      final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
      ProviderInvokerWrapper providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
      registryUrl, registeredProviderUrl);
      //to judge if we need to delay publish
      //是否将自己注册到注册中心
      boolean register = providerUrl.getParameter(REGISTER_KEY, true);
      //将自己注册到注册中心
      if (register) {
      register(registryUrl, registeredProviderUrl);
      providerInvokerWrapper.setReg(true);
      }


      // Deprecated! Subscribe to override rules in 2.6.x or before.
      registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
      //向注册中心注册服务提供者URl
      exporter.setRegisterUrl(registeredProviderUrl);
      //向注册中心提交订阅配置规则
      exporter.setSubscribeUrl(overrideSubscribeUrl);
      //Ensure that a new exporter instance is returned every time export
      return new DestroyableExporter<>(exporter);
      }

    1. 解释主要干了这么些

      1. 获取注册中心URL

      2. 获取服务提供者的URL(自己的)

      3. 本地暴露

      4. 将自己注册到注册中心

      5. 向注册中心,注册服务提供者URL

      6. 向注册中心订阅配置规则

    2. 接下来看19行本地暴露doLocalExport()

        private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);


        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter) protocol.export(invokerDelegate), originInvoker);
        });
        }

      1. 进入到DubboProtocol

  4. DubboProtocol

      public Exporter export(Invoker invoker) throws RpcException {
      URL url = invoker.getUrl();


      // export service.
      String key = serviceKey(url);
      DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
      exporterMap.put(key, exporter);


      //export an stub service for dispatching event
      Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
      Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
      if (isStubSupportEvent && !isCallbackservice) {
      String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
      if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
      if (logger.isWarnEnabled()) {
      logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
      "], has set stubproxy support event ,but no stub methods founded."));
      }


      } else {
      stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
      }
      }
      //启动服务器
      openServer(url);
      //初始化序列化优化器
      optimizeSerialization(url);


      return exporter;
      }

    1. 我们主要看25行启动服务器openServer(url)

        private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
        synchronized (this) {
        server = serverMap.get(key);
        if (server == null) {
        serverMap.put(key, createServer(url));
        }
        }
        } else {
        // server supports reset, use together with override
        server.reset(url);
        }
        }
        }

      1. 解释:看服务是否启动,启动了:重启,没启动进入12行createServer(url)

    2. createServer(url):创建一个服务器

        private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
        // send readonly event when server closes, it's enabled by default
        // 当服务关闭时发送只读时间
        .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
        // enable heartbeat by default
        //默认启动心跳
        .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
        .addParameter(CODEC_KEY, DubboCodec.NAME)
        .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);


        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
        //启动服务器
        ExchangeServer server;
        try {
        server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }


        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
        throw new RpcException("Unsupported client type: " + str);
        }
        }


        return server;
        }

      1. 主要看19行Exchangers.bind(url,requestHandler)

        1. requestHandler:这个主要是封装了自己的ExchangeHandler,主要做用户处理请求、消息、链接、链接断开等操作

        2. 接下来进入Exchangers

  5. Exchangers:信息交换层

      public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
      if (url == null) {
      throw new IllegalArgumentException("url == null");
      }
      if (handler == null) {
      throw new IllegalArgumentException("handler == null");
      }
      url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
      return getExchanger(url).bind(url, handler);
      }

  6. Transporters:数据传输层,这里主要看Netty4

      /**
      * 绑定一个服务器,静态方法
      * @param url
      * @param handlers
      * @return
      * @throws RemotingException
      */
      public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
      if (url == null) {
      throw new IllegalArgumentException("url == null");
      }
      if (handlers == null || handlers.length == 0) {
      throw new IllegalArgumentException("handlers == null");
      }
      ChannelHandler handler;
      if (handlers.length == 1) {
      handler = handlers[0];
      } else {
      handler = new ChannelHandlerDispatcher(handlers);
      }
      return getTransporter().bind(url, handler);
      }

    1. 跟进去就是NettyServer->AbstractServer()->doOpen();剩下的基本就是服务启动那一套


总结:整个过程就是服务暴露,总体感觉没什么用呢,只是看到了表象,层次分的比较好,这种上层抽象,然后特色业务各个封装,原谅我没get到提升的点



推荐阅读
author-avatar
安彬2502936127
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有