热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqlinteger字置为空_FlinkTableApiamp;SQL初体验,Blink的使用

FlinkTableApi&SQL初体验,Blink的使用概述Flink具有TableAPI和SQL-用于统一流和批处理。TableAPI是用于Scala和Java的语言集成查询A
49d85efe2e481fa5fc4f441bb03f99ac.png

Flink Table Api & SQL 初体验,Blink的使用

概述

Flink具有Table API和SQL-用于统一流和批处理。
Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3

org.apache.flinkflink-java${flink.version}providedorg.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-table-api-java-bridge_2.11${flink.version}org.apache.flinkflink-table-planner-blink_2.11${flink.version}org.apache.flinkflink-table-api-java${flink.version}

模拟一个实时流

import lombok.Data;
@Data
public class Product {public Integer id;public String seasonType;
}

自定义Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Random;public class ProductStremingSource implements SourceFunction {private boolean isRunning = true;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning){// 每一秒钟产生一条数据Product product = generateProduct();ctx.collect(product);Thread.sleep(1000);}}private Product generateProduct(){int i = new Random().nextInt(100);ArrayList list = new ArrayList();list.add("spring");list.add("summer");list.add("autumn");list.add("winter");Product product = new Product();product.setSeasonType(list.get(new Random().nextInt(4)));product.setId(i);return product;}@Overridepublic void cancel() {}
}

主程序

public class TableStremingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 使用BlinkEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);SingleOutputStreamOperator source = bsEnv.addSource(new MyStremingSource()).map(new MapFunction() {@Overridepublic Item map(Item value) throws Exception {return value;}});// 分割流final OutputTag even = new OutputTag("even") {};final OutputTag old = new OutputTag("old") {};SingleOutputStreamOperator sideOutputData = source.process(new ProcessFunction() {@Overridepublic void processElement(Item value, Context ctx, Collector out) throws Exception {if (value.getId() % 2 == 0) {ctx.output(even,value);}else{ctx.output(old,value);}}});DataStream evenStream = sideOutputData.getSideOutput(even);DataStream oldStream = sideOutputData.getSideOutput(old);// 注册两个 表 : evenTable,oddTablebsTableEnv.registerDataStream("evenTable",evenStream , "name,id");bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");// 执行sql 输出TableTable queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");queryTable.printSchema();;// 获取流DataStream>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){}));dataStream.print();bsEnv.execute("demo");}
}

结果打印

90d5cd5d364b5ac9970044cbbc2ea8d3.png

输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com

扫码关注公众号《ipoo》

49aaf26f79d54b482bb596bea111fcef.png



推荐阅读
  • 本文探讨了利用Java实现WebSocket实时消息推送技术的方法。与传统的轮询、长连接或短连接等方案相比,WebSocket提供了一种更为高效和低延迟的双向通信机制。通过建立持久连接,服务器能够主动向客户端推送数据,从而实现真正的实时消息传递。此外,本文还介绍了WebSocket在实际应用中的优势和应用场景,并提供了详细的实现步骤和技术细节。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 本文探讨了 Java 中 Pair 类的历史与现状。虽然 Java 标准库中没有内置的 Pair 类,但社区和第三方库提供了多种实现方式,如 Apache Commons 的 Pair 类和 JavaFX 的 javafx.util.Pair 类。这些实现为需要处理成对数据的开发者提供了便利。此外,文章还讨论了为何标准库未包含 Pair 类的原因,以及在现代 Java 开发中使用 Pair 类的最佳实践。 ... [详细]
  • 本文深入探讨了CGLIB BeanCopier在Bean对象复制中的应用及其优化技巧。相较于Spring的BeanUtils和Apache的BeanUtils,CGLIB BeanCopier在性能上具有显著优势。通过详细分析其内部机制和使用场景,本文提供了多种优化方法,帮助开发者在实际项目中更高效地利用这一工具。此外,文章还讨论了CGLIB BeanCopier在复杂对象结构和大规模数据处理中的表现,为读者提供了实用的参考和建议。 ... [详细]
  • 导读:本篇文章编程笔记来给大家介绍有关php怎么遍历对象的相关内容,希望对大家有所帮助,一起来看看吧。本文目录一览:1、如何用php将数 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 在使用 Qt 进行 YUV420 图像渲染时,由于 Qt 本身不支持直接绘制 YUV 数据,因此需要借助 QOpenGLWidget 和 OpenGL 技术来实现。通过继承 QOpenGLWidget 类并重写其绘图方法,可以利用 GPU 的高效渲染能力,实现高质量的 YUV420 图像显示。此外,这种方法还能显著提高图像处理的性能和流畅性。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 在稀疏直接法视觉里程计中,通过优化特征点并采用基于光度误差最小化的灰度图像线性插值技术,提高了定位精度。该方法通过对空间点的非齐次和齐次表示进行处理,利用RGB-D传感器获取的3D坐标信息,在两帧图像之间实现精确匹配,有效减少了光度误差,提升了系统的鲁棒性和稳定性。 ... [详细]
  • 在使用sbt构建项目时,遇到了“对象apache不是org软件包的成员”的错误。本文详细分析了该问题的原因,并提供了有效的解决方案,包括检查依赖配置、清理缓存和更新sbt插件等步骤,帮助开发者快速解决问题。 ... [详细]
  • 1.html页面如何使用swiper对swiper不熟练的小伙伴们可能不知道怎么开始使用它,那么下面就让我来简单讲述一下关于swiper的使用流程,这 ... [详细]
author-avatar
秋秋
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有