按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException: SQL validation failed. null 问题
import org.apache.flink.configuration.JobManagerOptions import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} import org.apache.flink.table.catalog.hive.HiveCatalog object testsql { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) tEnv.sqlUpdate("create function replaces as 'com.bigdata.util.udf.Replaces'") tEnv.sqlUpdate("create function avgprice as /'com.bigdata.util.udf.AvgPriceAgg/'") tEnv.sqlUpdate(getSourceSql)//创建数据源 tEnv.sqlUpdate(getSinkSql)//创建写入表 tEnv.sqlUpdate(processSql)//处理逻辑 tEnv.execute("SQL Job") } def getSourceSql = "CREATE TABLE order_info (...) with(...)" def processSql = "INSERT INTO datasink select avgprice(a.price,a.total_count) as avg_price from order_info a group by a.item_id"
def getSinkSql = "CREATE TABLE datasink (...) with(...)"
}
原来运行时的异常信息找不见了,以下是在单元测试的异常
org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at TestAvgPriceAgg.TestAgg(TestAvgPriceAgg.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.table.functions.AggregateFunctionDefinition.(AggregateFunctionDefinition.java:48) at org.apache.flink.table.functions.FunctionDefinitionUtil.createFunctionDefinition(FunctionDefinitionUtil.java:57) at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:336) at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$2(FunctionCatalog.java:374) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:374) at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:303) at org.apache.flink.table.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:74) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1194) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1179) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1209) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1179) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:936) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650) at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:126) ... 30 more
大概意思就是sql校验没有通过,对照代码行数在执行processSql 这句的时候有问题,然后查看TableEnvironment发现只支持注册ScalarFunction,并且没有重载方法
查看源码发现TableEnvironment是顶级接口
在实现上是 5 个面向用户的接口,在接口底层进行了不同的实现,5 个接口包括一个 TableEnvironment 接口,两个 BatchTableEnvironment 接口,两个 StreamTableEnvironment 接口,5 个接口文件完整路径如下:
org.apache.flink.table.api.TableEnvironment
org.apache.flink.table.api.java.BatchTableEnvironment
org.apache.flink.table.api.java.StreamTableEnvironment
org.apache.flink.table.api.scala.BatchTableEnvironment
org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings} import org.apache.flink.table.api.java.StreamTableEnvironment object tests { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val streamExecEnvirOnment= getStreamEnv val tEnv: StreamTableEnvirOnment= StreamTableEnvironment.create(streamExecEnvironment, settings) tEnv.sqlUpdate("create function replaces as 'com.bigdata.util.udf.Replaces'") tEnv.registerFunction("avgprice", new AvgPriceAgg()) tEnv.sqlUpdate(getSourceSql) tEnv.sqlUpdate(getSinkSql) tEnv.sqlUpdate(processSql) tEnv.execute("SQL Job") } def getStreamEnv(): StreamExecutiOnEnvironment= { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60 * 1000 * 10, CheckpointingMode.EXACTLY_ONCE) val cOnfig= env.getCheckpointConfig //RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1 config.setMinPauseBetweenCheckpoints(60 * 1000 * 5) //用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用) config.setMaxConcurrentCheckpoints(1) //指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉 config.setCheckpointTimeout(60 * 1000 * 15) //用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行 //https://issues.apache.org/jira/browse/FLINK-11662 1.10改为配置失败次数 配置false的话就默认最大2147483647 config.setFailOnCheckpointingErrors(false) env } def getSourceSql = "CREATE TABLE order_info (...) with(...)" def processSql = "INSERT INTO datasink select avgprice(a.price,a.total_count) as avg_price from order_info a group by a.item_id" def getSinkSql = "CREATE TABLE datasink (...) with(...)" }
参考文档:https://blog.csdn.net/weixin_44904816/article/details/102550056