近日梳理了 FLINK 中定制类加载器和上传用户依赖的相关代码,加之恰好看到邮件列表有人在问 parent-first 和 child-first 两个不同策略的具体含义,于是想简单介绍一下定制类加载器在 FLINK 当中的实践以及提交 FLINK 作业时用户依赖的上传的一些细节,以帮助遇到相关问题的用户对这方面的设计和实现有一个初步的认识。
本文在介绍过程中会引用部分源码帮助理解,这是因为部分实现细节结合源码更好展开讨论的缘故,对应的 commit 号为 ea721c870057fbe6c9b3ab95138bf3e292c63625
。
FLINK 中的类加载策略
首先我们看到两个问题当中技术性和解决方案都比较明确的一个,即 FLINK 可配置的类加载策略。这里我们所说的类加载指的是通过类加载器(ClassLoader)定制化的类加载阶段中的“通过一个类的全限定名来获取描述此类的二进制字节流”的动作,parent-first 和 chid-first 对应的也是 FLINK 当中可以使用的两种类加载器。
这两种类加载器都是 URLClassLoader
的子类,即可以通过 URL 来指定搜寻全限定名对应的类的二进制流的类加载器。FLINK 中的用户依赖只有在运行时才能被确定下来,通过 URL 来指定用户依赖的地址也是许多 Java 应用的常见操作。
两种类加载器的不同点在于接受全限定名是加载类时检索可能的匹配二进制流的顺序,也就是不同的 loadClass
方法的实现。一个复杂的 FLINK 应用可能包括大量的依赖,而 FLINK 本身也是一个复杂的程序,也包括了大量的依赖。两个不同程序的依赖可能会产生冲突,例如,Kafka 在 FLINK 中的主要依赖版本是一个具体的小版本,而用户程序可能在更早的版本或者更新的版本。这些不同的版本在同一个类的方法接口上可能存在着不兼容的现象,即使用同一个全限定名,可能加载出来的二进制流不是用户程序中需要的那一个。这样,在运行用户代码的时候,就会出现各种依赖冲突导致的运行时异常。在这种情况下,我们需要让类加载器正确地加载用户提供的正确版本的依赖。
我们知道,Java 推荐类加载器遵循所谓的双亲委派模型。这一模型的工作方式是,如果一个类加载器收到了类加载的请求,它首先不会自己去尝试加载这个类,而是把这个请求委派给父类加载器去完成,每一个层次的类加载器都是如此,因此所有的加载请求最终都应该传递到顶层的类加载器即 Bootstrap ClassLoader 中,只有当父加载器反馈自己无法完成这个加载请求时,即它的搜索范围中没有找到所需的类时,子加载器才会尝试自己去加载。
在 FLINK 的上下文中,FLINK 集群运行着 FLINK 的框架代码,这些代码包括 FLINK 的各种依赖。用户程序作为客体,要将用户的依赖扩充到类加载器中,为了加载 FLINK 提供的基础抽象,需要将 FLINK 框架的类加载器作为父加载器来构建自己的类加载器。但是根据双亲委派模型,这样在解析全限定名时会先找父类加载器中查找,一旦依赖冲突,将加载到错误的类,进而引发异常。
这个双亲委派模型的实现我们可以简单看下 JDK 中的源码(为了阅读方面已做相关处理)
protected Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {synchronized (getClassLoadingLock(name)) {// First, check if the class has already been loadedClass> c = findLoadedClass(name);if (c == null) {try {if (parent != null) {c = parent.loadClass(name, false);} else {c = findBootstrapClassOrNull(name);}} catch (ClassNotFoundException e) {// ClassNotFoundException thrown if class not found// from the non-null parent class loader}if (c == null) {// If still not found, then invoke findClass in order// to find the class.c = findClass(name);}}if (resolve) {resolveClass(c);}return c;}}
这里我们不管 resolve 相关的问题,主要看到类加载的过程,结合注释应该可以理解确实和上面所说的流程一致。
对于我们上面提到的问题,我们不希望使用双亲委派模型来加载依赖,而是首先尝试加载用户提供的依赖。为此,FLINK 提供了 child-first 的类加载策略,这次我们先看代码再介绍过程
public final class ChildFirstClassLoader extends URLClassLoader {@Overrideprotected synchronized Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {// First, check if the class has already been loadedClass> c = findLoadedClass(name);if (c == null) {// check whether the class should go parent-firstfor (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {if (name.startsWith(alwaysParentFirstPattern)) {return super.loadClass(name, resolve);}}try {// check the URLsc = findClass(name);} catch (ClassNotFoundException e) {// let URLClassLoader do it, which will eventually call the parentc = super.loadClass(name, resolve);}}if (resolve) {resolveClass(c);}return c;}
}
可以看到代码逻辑非常的简单,首先看到有个 alwaysParentFirstPatterns
的过滤器,用来保证 Java 和 Scala 的内部类和 FLINK 的基础类不会被子加载器篡改,如果与这个过滤器无关,才会进入到真正 child-first 的逻辑里面。 首先查看类是否已经加载,如果没有加载的话,不是像默认行为那样先 parent.loadClass
再 finaClass
,而是先 findClass
再 loadClass
,就是这么简单,我们就扭转了加载的顺序。
此外,child-first 还涉及类加载器的另一个 getResource
的功能,这个跟获取 classpath 中的资源文件有关,具体可以自行查询。默认也有双亲委派的 parent-first 和可配置的 child-first 解析顺序。
FLINK 中的依赖上传实现
这一节主要介绍的是 FLINK 在提交用户作业时如何将用户作业所需的依赖也传输到执行框架代码的环境中。本节内容就没有太多技术上的取舍了,纯粹是设计的故意和琐碎的细节。
首先值得一提的是,在 FLINK 1.9 以前的代码中,child-frist 这个配置只在 session 模式下是有用的。这是因为 YARN PerJob 模式下,所有的用户依赖都在系统 classpath 中随着 JobClusterEntrypoint 的执行而被加载,有一个 hack 的 yarn.per-job-cluster.include-user-jar
配置可以指定用户依赖的排列顺序,从而某种程度上解决 child-first 的需求,但是这个与干净的 ClassLoader 方案特别是在 alwaysParentFirstPattern
和概念区分上还是有差距。
此外 Client 提交的过程中,用于编译的 ClassLoader 只有 parent-first 的,这会导致莫名其妙的 JobGraph 编译失败,而使用 PerJob 和 yarn.per-job-cluster.include-user-jar: FIRST
却能成功的情况。
这两个问题分别由 FLINK-14466 和 FLINK-13749 解决,希望在更新的版本中能够为用户带来更好的体验。
在描述完一些奇怪的问题之后,我们简单介绍一下依赖上传的实际搞法。
对于 Session 模式来说,Dispatcher 会起一个 BlobServer 接受客户端传输的依赖,保存在本地并根据 HA 配置选择是否复制到 HDFS 上。这个流程可以参考 ClusterClient#submitJob
中上传作业依赖的部分,以及 flink-runtime 下 ClientUtils
的相关实用类,基本都是干这个事的。在启动作业时,JobManager 会起一个叫 LibraryCacheManager
的玩意,这个东西的 getClassLoader
方法就搞定了用户启动时需要的类加载器。如果配置成 child-first,那就根据上传的 URL 配出一个 child-first 的类加载器。
对于 1.9 及之前的 PerJob 模式来说,这个配置是失效的,通过上面提到的选项在部署 YARN AM 的时候通过不同的方式拼接 classpath 来达到类似于 child-first 的效果,这部分代码可以查看 YarnClusterDescriptor#startAppMaster
的相关内容,这部分方法属于大泥球代码,祝你好运!
FLINK-14466 通过在 PerJob 模式下将用户依赖 ship 到一个固定的目录下,在启动的时候动目录中加载出的依赖就算作用户依赖来解决这个问题。另一种可行的解决方案是部署 YARN AM 时依赖都 ship 但是只把系统依赖写在 classpath 里,把用户依赖写在一个配置字符串里,在启动的时候再从配置字符串中读取恢复。这两种方案都能使得 PerJob 和 Session 在除了作业提交方式外的各种层面上表现得更加一致。
这方面的相关资料我才发现官方文档 Debugging Classloading 已经说得蛮多的了,英文好的同学也可以读一读。里面明确的提到了 YARN PerJob 下 child-first 配置不起效等问题。