近日升级到hudi 0.11后,在flink应用中遭遇了一个神级异常:java.lang.ClassCastException: org.apache.hudi.common.fs.HoodieWrapperFileSystem cannot be cast to org.apache.hudi.common.fs.HoodieWrapperFileSystem。
没看错吧?同名类转换失败?揉揉眼睛,逐字符再看一次,的确是同名类。同类对象转换还失败了?真是活久见。
2022-05-30 14:40:27
java.lang.ClassCastException: org.apache.hudi.common.fs.HoodieWrapperFileSystem cannot be cast to org.apache.hudi.common.fs.HoodieWrapperFileSystemat org.apache.hudi.io.storage.HoodieParquetWriter.<init>(HoodieParquetWriter.java:72)at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:84)at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:70)at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:54)at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:101)at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:80)at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:67)at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:60)at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:479)at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:143)at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:184)at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:454)at org.apache.hudi.sink.StreamWriteFunction.endInput(StreamWriteFunction.java:151)at org.apache.hudi.sink.common.AbstractWriteOperator.endInput(AbstractWriteOperator.java:48)at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:100)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)at java.lang.Thread.run(Thread.java:748)
二、异常分析
一般而言&#xff0c;ClassCastException是不同类之间的对象强行转换时才会引起的。那么在java里&#xff0c;怎么判断一个类是否相同呢&#xff1f;
我们知道&#xff0c;在java里&#xff0c;万物皆对象&#xff0c;我们开发的每一个类&#xff0c;一般情况下&#xff0c;运行时都有唯一一个对应的Class类对象。这个类对象在类首次被使用时由java虚拟机加载、创建。我们看Class类的equal方法源码&#xff0c;两个类对象是否相同&#xff0c;取决于是否为同一个对象引用。
public boolean equals(Object obj) {return (this &#61;&#61; obj);}
很明显&#xff0c;类对象由谁创建和持有&#xff0c;就影响到了类是否相同的判断及类对象强转。而这就涉及到了jvm原理和类的加载机制。
简单而言&#xff0c;jvm运行时会有一个专属的内存区域&#xff08;方法区或meta space)存放类的信息。类信息由jvm通过类加载器ClassLoader来加载。熟悉jvm原理的人都知道&#xff0c;默认情况下&#xff0c;java虚拟机使用双亲委派机制进行类加载。
双亲委派机制简述如下&#xff1a;
1、如果一个类加载器收到了类加载请求&#xff0c;它并不会自己先去加载&#xff0c;而是把这个请求委托给父类的加载器去执行。
2、如果父类加载器还存在其父类加载器&#xff0c;则进一步向上委托&#xff0c;依次递归请求最终将到达顶层的启动类加载器。
3、如果父类加载器可以完成类加载任务&#xff0c;就成功返回&#xff0c;倘若父类加载器无法完成此加载任务&#xff0c;子加载器才会尝试自己去加载&#xff0c;这就是双亲委派模式。
由此可以看出&#xff0c;在双亲委派机制下&#xff0c;可以保证一个类只会被加载一次&#xff0c;避免类的重复加载。
前面我们通过源码可以知道&#xff0c;类对象判同是通过类对象的指针是否相同来判断的。而类对象通过双亲委派机制由Classloader加载、创建。而类是通过全类名&#xff08;Class.forName&#xff09;来寻找、加载的。
由此可以推导出&#xff0c;在JVM中表示两个class对象是否是同一个类的两个必要条件&#xff1a;
1、类的全限制名一致&#xff08;包名&#43;类名&#xff09;。
2、加载这个类的ClassLoader必须相同。
由于我们遇到的是同名类强转异常&#xff0c;可以肯定我们类的全限定名是一致的。那么问题就是出在类的加载器身上。
我们知道&#xff0c;flink有一个配置项&#xff0c;classloader.resolve-order&#xff0c;可以配置类的加载顺序。classloader.resolve-order的默认配置是child-first&#xff0c;这一配置打破了java默认的双亲委派机制。
child-first配置对应的类加载器是ChildFirstClassLoader&#xff0c;这个加载器除了指定的系统类、java lang、flink源等基础类是遵守双亲委派由父加载器加载&#xff0c;其他的类是直接由ChildFirstClassLoader直接加载。这样做的一个好处是&#xff0c;当用户依赖于flink依赖有冲突时&#xff0c;用户可以优先使用自己包内在类进行加载。
ChildFirstClassLoader的构造器需要指定jar包资源urls、父加载器、父加载器加载的类模式、加载失败处理逻辑。
父加载器加载的类范围由assloader.parent-first-patterns.default指定&#xff0c;符合规则的由父加载器加载&#xff0c;否则由子加载器进行加载&#xff08;loadClassWithoutExceptionHandling方法&#xff09;。子加载器寻找jar包资源时&#xff08;getResource方法&#xff09;&#xff0c;也优先寻找用户资源&#xff0c;再寻找父加载器的资源。
assloader.parent-first-patterns.default默认为&#xff1a;
java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb。
ChildFirstClassLoader核心源码&#xff1a;
public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader {/*** The classes that should always go through the parent ClassLoader. This is relevant for Flink* classes, for example, to avoid loading Flink classes that cross the user-code/system-code* barrier in the user-code ClassLoader.*/private final String[] alwaysParentFirstPatterns;public ChildFirstClassLoader(URL[] urls,ClassLoader parent,String[] alwaysParentFirstPatterns,Consumer<Throwable> classLoadingExceptionHandler) {super(urls, parent, classLoadingExceptionHandler);this.alwaysParentFirstPatterns &#61; alwaysParentFirstPatterns;}&#64;Overrideprotected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)throws ClassNotFoundException {// First, check if the class has already been loadedClass<?> c &#61; findLoadedClass(name);if (c &#61;&#61; null) {// check whether the class should go parent-firstfor (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {if (name.startsWith(alwaysParentFirstPattern)) {return super.loadClassWithoutExceptionHandling(name, resolve);}}try {// check the URLsc &#61; findClass(name);} catch (ClassNotFoundException e) {// let URLClassLoader do it, which will eventually call the parentc &#61; super.loadClassWithoutExceptionHandling(name, resolve);}} else if (resolve) {resolveClass(c);}return c;}&#64;Overridepublic URL getResource(String name) {// first, try and find it via the URLClassloaderURL urlClassLoaderResource &#61; findResource(name);if (urlClassLoaderResource !&#61; null) {return urlClassLoaderResource;}// delegate to superreturn super.getResource(name);}&#64;Overridepublic Enumeration<URL> getResources(String name) throws IOException {// first get resources from URLClassloaderEnumeration<URL> urlClassLoaderResources &#61; findResources(name);final List<URL> result &#61; new ArrayList<>();while (urlClassLoaderResources.hasMoreElements()) {result.add(urlClassLoaderResources.nextElement());}// get parent urlsEnumeration<URL> parentResources &#61; getParent().getResources(name);while (parentResources.hasMoreElements()) {result.add(parentResources.nextElement());}return new Enumeration<URL>() {Iterator<URL> iter &#61; result.iterator();public boolean hasMoreElements() {return iter.hasNext();}public URL nextElement() {return iter.next();}};}static {ClassLoader.registerAsParallelCapable();}}
由于flink默认使用ChildFirstClassLoader进行类加载&#xff0c;打破了源生的类加载机制&#xff0c;这可能会导致父加载类加载过的类&#xff0c;会被子加载器重复加载&#xff0c;进而导致同名类判同异常&#xff0c;影响对象强转&#xff0c;导致运行时出现同名类强转异常&#xff0c;抛ClassCastException。
它也会导致类的继承关系判断失效&#xff0c;导致子类转父类失败。因为父子类被不同的类加载器加载&#xff0c;同名父类被判断为不同类。异常示例如下&#xff1a;
Cannot cast org.apache.hudi.hive.HiveSyncConfig to org.apache.hudi.hive.replication.GlobalHiveSyncConfig
java.lang.ClassCastException: Cannot cast org.apache.hudi.hive.HiveSyncConfig to org.apache.hudi.hive.replication.GlobalHiveSyncConfig
三、异常解决方案
通过上述分析&#xff0c;在flink中遇到的同名类强转异常是由类加载器引起的。flink默认情况下&#xff0c;使用ChildFirstClassLoader进行加载。ChildFirstClassLoader加载类时&#xff0c;也优先使用用户的资源。一般情况下&#xff0c;这个机制是可以良好运行的。但有时也会出现同名类强转异常。这时在排除用户jar与flink运行依赖冲突后&#xff0c;若异常依然存在&#xff0c;可以修改flink的配置(conf/flink-conf.yaml)&#xff0c;将classloader.resolve-order改为parent-first&#xff0c;这样类就只会加载一次&#xff0c;避免重复加载出现强转异常。