热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

「Flink」工具人Flink之组件通信

Flink算子之间通过Netty交互,而组件之间的通信是

      最近,工具人突然被拉进了很多工作微信群。这些群有些共性:比如:

1,在这些群中,被动加入的都是工具人,而群主和他的朋友,都是工具的使用人;

2,在这些群中,工具的使用人各不相同,而工具就那么几个;

3,这些群,安静时是逢年过节,活跃时都在讨论用哪个工具人祭天;

4,这些群的名字,大部分都叫:XX需求爆肝群,XX问题背锅群,XX团队马屁群。

     所以从被迫进群的那一刻,工具人们就如同开始了一场绝地求生的游戏,不约而同地开启消息免打扰,尽量保持隐身。于是,工具人很困惑,明明已经有开会这个消磨时间的沟通方式了,为啥还要在微信中打磨工具人呢?

     在Flink中,同样让工具人困惑的是,之前学习了Flink的算子之间的数据传递过程,这部分的数据传递在Flink中是通过netty来实现的(相关阅读可以参见:「Flink」工具人之初学Flink-Task算子通信(一)和「Flink」工具人之初学Flink-Task算子通信(二)),而今天要学习的Flink组件之间的通信(如:JobMaster,TaskManager等)却是通过Akka来实现的。

在整个通信的设计的模型中,主要分为几个概念:


一,RpcService

RpcService的唯一实现就是AkkaRpcService,这就是组件事件通信的akka实体。

下图中,我们可以看到AkkaRpcService持有了进程的地址、服务端口,以及提供的一些核心服务。

在进程启动时,以JobManager为例:

ClusterEntrypoint.class

    private void runCluster(Configuration configuration) throws Exception {
    synchronized (lock) {
          //各项服务初始化,其中就包涵了AkkaService的初始化
          initializeServices(configuration);
          .....
    }
    }

    protected void initializeServices(Configuration configuration) throws Exception {


    LOG.info("Initializing cluster services.");


    synchronized (lock) {
    final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
    final String portRange = getRPCPortRange(configuration);
    //创建RpcService
    commOnRpcService= createRpcService(configuration, bindAddress, portRange);
    ......
    }
    }


    @Nonnull
    private RpcService createRpcService(Configuration configuration, String bindAddress, String portRange) throws Exception {
        //创建了AkkaRpcService
    return AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
    }

    二,RpcEndpoint是服务提供方,而RpcGateway是服务调用方的代理类。

    如下面的类图中,我们以JobMaster举例:


    JobMaster作为JobManager中的核心服务,他是服务提供方。由于他都受到了高可用的保护,会同时存在多个节点,所以,他都继承于FencedRpcEndpoint,并提供相应的RPC服务调用。

    同时JobMaster也继承了JobMasterGateway,通过继承JobMasterGateway来约束了JobMaster必须提供的接口契约,JobMasterGateway是TaskManager中的JobMaster的代理实例,将会被TaskManager调用。


    每个服务提供方(如:JobMaster)在初始化时,都会在基类RpcEndpoint的构造过程中,调用自动自己的Akka服务,创建ActorRef对象,绑定远端Gateway的动态代理,并返回了RpcServer对象。

    RpcEndpoint.class

      protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
      this.rpcService = checkNotNull(rpcService, "rpcService");
      this.endpointId = checkNotNull(endpointId, "endpointId");
          //调用AkkaRpcService创建本服务的ActorRef,并绑定远端Gateway的动态代理
      this.rpcServer = rpcService.startServer(this);


      this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
      }

      这里我们截取部分rpcService::startServer的源码略作说明:

      AkkaRpcService.class

        @Override
        public RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");
        .......
        ActorRef actorRef;
            //在这里创建了actorRef
        synchronized (lock) {
        checkState(!stopped, "RpcService is stopped");
        actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
        actors.put(actorRef, rpcEndpoint);
        }


        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());


        final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
        final String hostname;
        Option host = actorRef.path().address().host();
        if (host.isEmpty()) {
        hostname = "localhost";
        } else {
        hostname = host.get();
        }


        Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));


        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(AkkaBasedEndpoint.class);


        final InvocationHandler akkaInvocationHandler;


        if (rpcEndpoint instanceof FencedRpcEndpoint) {
        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
        akkaInvocatiOnHandler= new FencedAkkaInvocationHandler<>(
        akkaAddress,
        hostname,
        actorRef,
        configuration.getTimeout(),
        configuration.getMaximumFramesize(),
        terminationFuture,
        ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken);


        implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {
        akkaInvocatiOnHandler= new AkkaInvocationHandler(
        akkaAddress,
        hostname,
        actorRef,
        configuration.getTimeout(),
        configuration.getMaximumFramesize(),
        terminationFuture);
        }


        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

        //在这里创建了动态代理
        @SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
        classLoader,
        implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]),
        akkaInvocationHandler);


        return server;
        }


        而当JobMaster启动时,则会最终调用rpcServer的start方法

        JobMaster.class

          /**
          * Start the rpc service and begin to run the job.
          *
          * @param newJobMasterId The necessary fencing token to run the job
          * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
          */
          public CompletableFuture start(final JobMasterId newJobMasterId) throws Exception {
          // make sure we receive RPC and async calls
          super.start();


          return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
          }

          这里的rpcEndpoint就是实际的ActorRef对象,向Actor发送了一个匿名的START指令,启动了endpoint。

          RpcEndpoint.class

            @Override
            public void start() {
            rpcEndpoint.tell(Processing.START, ActorRef.noSender());
            }

            三,连接过程

            当TaskManager收到了ResourceManager的创建资源的消息后,会主动通知JobMaster,然后JobMaster会发起到TaskExecutor的连接。

            JobLeaderService.class

              /**
              * Retrying registration for the job manager <--> task manager connection.
              */
              private static final class JobManagerRetryingRegistration
              extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {


              private final String taskManagerRpcAddress;


              private final TaskManagerLocation taskManagerLocation;


              JobManagerRetryingRegistration(
              Logger log,
              RpcService rpcService,
              String targetName,
              Class targetType,
              String targetAddress,
              JobMasterId jobMasterId,
              RetryingRegistrationConfiguration retryingRegistrationConfiguration,
              String taskManagerRpcAddress,
              TaskManagerLocation taskManagerLocation) {
              super(
              log,
              rpcService,
              targetName,
              targetType,
              targetAddress,
              jobMasterId,
              retryingRegistrationConfiguration);


              this.taskManagerRpcAddress = taskManagerRpcAddress;
              this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
              }


              @Override
              protected CompletableFuture invokeRegistration(
              JobMasterGateway gateway,
              JobMasterId jobMasterId,
              long timeoutMillis) throws Exception {

                    //通知JobMaster,来连接TaskManager
              return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
              }
              }

              这里其实就是通过动态代理AkkaInvocationHandler发起了远程调用。

              在动态代理AkkaInvocationHandler内部,会根据调用的方判断是走本地调用还是远程调用。

                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Class declaringClass = method.getDeclaringClass();


                Object result;


                if (declaringClass.equals(AkkaBasedEndpoint.class) ||
                declaringClass.equals(Object.class) ||
                declaringClass.equals(RpcGateway.class) ||
                declaringClass.equals(StartStoppable.class) ||
                declaringClass.equals(MainThreadExecutable.class) ||
                      declaringClass.equals(RpcServer.class)) {
                result = method.invoke(this, args);
                } else if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
                method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
                "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
                "retrieve a properly FencedRpcGateway.");
                    } else {
                result = invokeRpc(method, args);
                    }
                return result;
                }

                在Flink中,有些场景看起来像是发起了一个Rpc调用,其实调用的还是本地服务,比如:Standalone模式下向ResourceManager申请资源等。所以,invokeRpc中将所有的调用参数统一封装为一个叫RpcInvocation的数据结构。

                它的两个子类LocalRpcInvocation和RemoteRpcInvocation 唯一的区别就是RemoteRpcInvocation 中的数据经过了序列化,方便网络传递。

                  protected RpcInvocation createRpcInvocationMessage(
                  final String methodName,
                  final Class[] parameterTypes,
                  final Object[] args) throws IOException {
                  final RpcInvocation rpcInvocation;


                  if (isLocal) {
                  rpcInvocation = new LocalRpcInvocation(
                  methodName,
                  parameterTypes,
                  args);
                  } else {
                  try {
                  RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
                  methodName,
                  parameterTypes,
                  args);


                  if (remoteRpcInvocation.getSize() > maximumFramesize) {
                  throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
                  } else {
                  rpcInvocation = remoteRpcInvocation;
                  }
                  } catch (IOException e) {
                  LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
                  throw e;
                  }
                  }


                  return rpcInvocation;
                  }


                  收到请求后 JobMaster连接TaskExecutor,并返回TaskExecutorGateway

                  JobMaster.class

                    @Override
                    public CompletableFuture registerTaskManager(
                    final String taskManagerRpcAddress,
                    final TaskManagerLocation taskManagerLocation,
                    final Time timeout) {


                    final ResourceID taskManagerId = taskManagerLocation.getResourceID();


                    if (registeredTaskManagers.containsKey(taskManagerId)) {
                    final RegistrationResponse respOnse= new JMTMRegistrationSuccess(resourceId);
                    return CompletableFuture.completedFuture(response);
                    } else {
                          // 连接TaskManager 并返回TaskExecutor
                    return getRpcService()
                    .connect(taskManagerRpcAddress, TaskExecutorGateway.class)
                    .handleAsync(
                    (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
                    ......
                    },
                    getMainThreadExecutor());
                    }
                    }


                    总结一下,以上整体过程大致如下图所示:



                    最后,工具人表示,在数不清的微信群中疲于奔命,会影响工作效率和质量;工具人们是喝咖啡集中精力思考与创造的工程师,而不是喝着麻辣烫,服务那个伺候这个的保姆。



                    推荐阅读
                    • 在Linux系统中避免安装MySQL的简易指南
                      在Linux系统中避免安装MySQL的简易指南 ... [详细]
                    • 深入解析 Lifecycle 的实现原理
                      本文将详细介绍 Android Jetpack 中 Lifecycle 组件的实现原理,帮助开发者更好地理解和使用 Lifecycle,避免常见的内存泄漏问题。 ... [详细]
                    • 本文深入探讨了Java多线程环境下的同步机制及其应用,重点介绍了`synchronized`关键字的使用方法和原理。`synchronized`关键字主要用于确保多个线程在访问共享资源时的互斥性和原子性。通过具体示例,如在一个类中使用`synchronized`修饰方法,展示了如何实现线程安全的代码块。此外,文章还讨论了`ReentrantLock`等其他同步工具的优缺点,并提供了实际应用场景中的最佳实践。 ... [详细]
                    • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
                    • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
                    • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
                    • com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ... [详细]
                    • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
                    • 本文详细介绍了如何使用Python中的smtplib库来发送带有附件的邮件,并提供了完整的代码示例。作者:多测师_王sir,时间:2020年5月20日 17:24,微信:15367499889,公司:上海多测师信息有限公司。 ... [详细]
                    • 第二十五天接口、多态
                      1.java是面向对象的语言。设计模式:接口接口类是从java里衍生出来的,不是python原生支持的主要用于继承里多继承抽象类是python原生支持的主要用于继承里的单继承但是接 ... [详细]
                    • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
                      在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
                    • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
                      本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
                    • Spring框架中枚举参数的正确使用方法与技巧
                      本文详细阐述了在Spring Boot框架中正确使用枚举参数的方法与技巧,旨在帮助开发者更高效地掌握和应用枚举类型的数据传递,适合对Spring Boot感兴趣的读者深入学习。 ... [详细]
                    • 在Java基础中,私有静态内部类是一种常见的设计模式,主要用于防止外部类的直接调用或实例化。这种内部类仅服务于其所属的外部类,确保了代码的封装性和安全性。通过分析JDK源码,我们可以发现许多常用类中都包含了私有静态内部类,这些内部类虽然功能强大,但其复杂性往往让人感到困惑。本文将深入探讨私有静态内部类的作用、实现方式及其在实际开发中的应用,帮助读者更好地理解和使用这一重要的编程技巧。 ... [详细]
                    • 实验九:使用SharedPreferences存储简单数据
                      本实验旨在帮助学生理解和掌握使用SharedPreferences存储和读取简单数据的方法,包括程序参数和用户选项。 ... [详细]
                    author-avatar
                    lucky_笨鸟_660
                    这个家伙很懒,什么也没留下!
                    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有