热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink1.10定义UDAGG遇到SQL

按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu


按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException: SQL validation failed. null 问题

import org.apache.flink.configuration.JobManagerOptions
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
 
 
object testsql {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
 
    val tEnv = TableEnvironment.create(settings)
 
    tEnv.sqlUpdate("create function replaces as 'com.bigdata.util.udf.Replaces'")
    tEnv.sqlUpdate("create function avgprice as /'com.bigdata.util.udf.AvgPriceAgg/'")
 
    tEnv.sqlUpdate(getSourceSql)//创建数据源
    tEnv.sqlUpdate(getSinkSql)//创建写入表
    tEnv.sqlUpdate(processSql)//处理逻辑
    tEnv.execute("SQL Job")
  }
 
def getSourceSql = "CREATE TABLE order_info (...) with(...)"
 
def processSql = "INSERT INTO datasink select avgprice(a.price,a.total_count) as avg_price from order_info a group by a.item_id" 

def getSinkSql = "CREATE TABLE datasink (...) with(...)"

}

原来运行时的异常信息找不见了,以下是在单元测试的异常

org.apache.flink.table.api.ValidationException: SQL validation failed. null
 
    at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
    at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
    at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
    at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
    at TestAvgPriceAgg.TestAgg(TestAvgPriceAgg.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
    at org.apache.flink.table.functions.AggregateFunctionDefinition.(AggregateFunctionDefinition.java:48)
    at org.apache.flink.table.functions.FunctionDefinitionUtil.createFunctionDefinition(FunctionDefinitionUtil.java:57)
    at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:336)
    at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$2(FunctionCatalog.java:374)
    at java.util.Optional.orElseGet(Optional.java:267)
    at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:374)
    at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:303)
    at org.apache.flink.table.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:74)
    at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1194)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1179)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1209)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1179)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:936)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650)
    at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:126)
    ... 30 more

大概意思就是sql校验没有通过,对照代码行数在执行processSql 这句的时候有问题,然后查看TableEnvironment发现只支持注册ScalarFunction,并且没有重载方法

Flink1.10定义UDAGG遇到SQL validation failed. null 问题

 查看源码发现TableEnvironment是顶级接口

Flink1.10定义UDAGG遇到SQL validation failed. null 问题

在实现上是 5 个面向用户的接口,在接口底层进行了不同的实现,5 个接口包括一个 TableEnvironment 接口,两个 BatchTableEnvironment 接口,两个 StreamTableEnvironment 接口,5 个接口文件完整路径如下:

org.apache.flink.table.api.TableEnvironment

org.apache.flink.table.api.java.BatchTableEnvironment

org.apache.flink.table.api.java.StreamTableEnvironment

org.apache.flink.table.api.scala.BatchTableEnvironment

org.apache.flink.table.api.scala.StreamTableEnvironment

Flink1.10定义UDAGG遇到SQL validation failed. null 问题

import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings}
import org.apache.flink.table.api.java.StreamTableEnvironment
 
object tests {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
 
    val streamExecEnvirOnment= getStreamEnv
    val tEnv: StreamTableEnvirOnment= StreamTableEnvironment.create(streamExecEnvironment, settings)
    tEnv.sqlUpdate("create function replaces as 'com.bigdata.util.udf.Replaces'")
    tEnv.registerFunction("avgprice", new AvgPriceAgg())
 
    tEnv.sqlUpdate(getSourceSql)
    tEnv.sqlUpdate(getSinkSql)
    tEnv.sqlUpdate(processSql)
    tEnv.execute("SQL Job")
  }
 
  def getStreamEnv(): StreamExecutiOnEnvironment= {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    env.enableCheckpointing(60 * 1000 * 10, CheckpointingMode.EXACTLY_ONCE)
    val cOnfig= env.getCheckpointConfig
    //RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
    config.setMinPauseBetweenCheckpoints(60 * 1000 * 5)
    //用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
    config.setMaxConcurrentCheckpoints(1)
    //指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
    config.setCheckpointTimeout(60 * 1000 * 15)
    //用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
    //https://issues.apache.org/jira/browse/FLINK-11662 1.10改为配置失败次数 配置false的话就默认最大2147483647
    config.setFailOnCheckpointingErrors(false)
    env
  }
def getSourceSql = "CREATE TABLE order_info (...) with(...)" 
def processSql = "INSERT INTO datasink select avgprice(a.price,a.total_count) as avg_price from order_info a group by a.item_id" 
def getSinkSql = "CREATE TABLE datasink (...) with(...)" 
}

参考文档:https://blog.csdn.net/weixin_44904816/article/details/102550056


推荐阅读
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
  • Java 中的 BigDecimal pow()方法,示例 ... [详细]
  • 在当前众多持久层框架中,MyBatis(前身为iBatis)凭借其轻量级、易用性和对SQL的直接支持,成为许多开发者的首选。本文将详细探讨MyBatis的核心概念、设计理念及其优势。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Java 中 Writer flush()方法,示例 ... [详细]
  • Java 类成员初始化顺序与数组创建
    本文探讨了Java中类成员的初始化顺序、静态引入、可变参数以及finalize方法的应用。通过具体的代码示例,详细解释了这些概念及其在实际编程中的使用。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 主要用了2个类来实现的,话不多说,直接看运行结果,然后在奉上源代码1.Index.javaimportjava.awt.Color;im ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文介绍了如何在C#中启动一个应用程序,并通过枚举窗口来获取其主窗口句柄。当使用Process类启动程序时,我们通常只能获得进程的句柄,而主窗口句柄可能为0。因此,我们需要使用API函数和回调机制来准确获取主窗口句柄。 ... [详细]
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 本文详细介绍了 Apache Jena 库中的 Txn.executeWrite 方法,通过多个实际代码示例展示了其在不同场景下的应用,帮助开发者更好地理解和使用该方法。 ... [详细]
author-avatar
清皮皮
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有