热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinksqlclentMATCH_RECOGNIZEkafka例子

flinksql-clentMATCH_RECOGNIZEkafka例子:环境flink1.7.2增加flink1.7.2的lib中的jar,否则会报类找不到avro-1.8.2.
环境 flink1.7.2

  1. 增加flink1.7.2 的lib 中的jar, 否则会报类找不到

    avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jar flink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1.7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jar flink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12-1.7.15.jar

  2. 修改 sql-client-defaults.yaml 中的table 值

tables: - name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties: - key: bootstrap.servers value: kafkaip:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)" schema: - name: sessionId type: STRING - name: fromUid type: STRING - name: toUid type: STRING - name: chatType type: STRING - name: type type: STRING - name: msgId type: STRING - name: msg type: STRING - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60" - name: procTime type: TIMESTAMP proctime: true

  1. 运行

./bin/sql-client.sh embedded select * from myTable;

技术图片

然后使用 MATCH_RECOGNIZE 的sql

SELECT * FROM myTable MATCH_RECOGNIZE ( PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST(e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (e1 e2) DEFINE e1 as e1.type = ‘yonghu‘, e2 as e2.type = ‘guanjia‘ );

上面是使用sql-client 不用谢代码,当然也可以写代码,下面是对应的程序

public static void main(String[] arg) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.connect(new Kafka() .version("0.11") .topic("im-message-topic3") //.property("zookeeper.connect","") .property("bootstrap.servers","kafkaip:9092") .startFromEarliest() .sinkPartitionerRoundRobin()//Flink分区随机映射到kafka分区 ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("sessionId", Types.STRING).from("sessionId") .field("fromUid", Types.STRING).from("fromUid") .field("toUid", Types.STRING).from("toUid") .field("chatType", Types.STRING).from("chatType") .field("type", Types.STRING).from("type") .field("msgId", Types.STRING).from("msgId") .field("msg", Types.STRING).from("msg") // .field("timestampSend", Types.SQL_TIMESTAMP) .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("timestampSend") .watermarksPeriodicBounded(1000) ) .field("proctime", Types.SQL_TIMESTAMP).proctime() ).inAppendMode().registerTableSource("myTable"); Table tb2 = tableEnv.sqlQuery( "SELECT " + "answerTime, customer_event_time, empUid, noreply_counts, total_talk " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.rowtime as answerTime, "+ "LAST(e1.rowtime) as customer_event_time, " + "e2.fromUid as empUid, " + "1 as noreply_counts, " + "e1.rowtime as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1 e2) " + "DEFINE " + "e1 as e1.type = ‘yonghu‘, " + "e2 as e2.type = ‘guanjia‘ " + ")"+ "" ); DataStream appendStream =tableEnv.toAppendStream(tb2, Row.class); System.out.println("schema is:"); tb2.printSchema(); appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE); logger.info("stream end"); Table tb3 = tableEnv.sqlQuery("select sessionId, type from myTable"); DataStream temp =tableEnv.toAppendStream(tb3, Row.class); tb3.printSchema(); temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE); env.execute("msg test"); }

大功告成,其实里面坑很多。

注意:如果使用了 TimeCharacteristic.EventTime, 请不用再使用procTime。

flink sql-clent MATCH_RECOGNIZE kafka 例子


推荐阅读
  • 在 Kubernetes 中,Pod 的调度通常由集群的自动调度策略决定,这些策略主要关注资源充足性和负载均衡。然而,在某些场景下,用户可能需要更精细地控制 Pod 的调度行为,例如将特定的服务(如 GitLab)部署到特定节点上,以提高性能或满足特定需求。本文深入解析了 Kubernetes 的亲和性调度机制,并探讨了多种优化策略,帮助用户实现更高效、更灵活的资源管理。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • 在Python中,是否可以通过使用Tkinter或ttk库创建一个具有自动换行功能的多行标签,并使其宽度能够随着父容器的变化而动态调整?例如,在调整NotePad窗口宽度时,实现类似记事本的自动换行效果。这种功能在设计需要显示长文本的对话框时非常有用,确保文本内容能够完整且美观地展示。 ... [详细]
  • 在Cisco IOS XR系统中,存在提供服务的服务器和使用这些服务的客户端。本文深入探讨了进程与线程状态转换机制,分析了其在系统性能优化中的关键作用,并提出了改进措施,以提高系统的响应速度和资源利用率。通过详细研究状态转换的各个环节,本文为开发人员和系统管理员提供了实用的指导,旨在提升整体系统效率和稳定性。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • MATLAB字典学习工具箱SPAMS:稀疏与字典学习的详细介绍、配置及应用实例
    SPAMS(Sparse Modeling Software)是一个强大的开源优化工具箱,专为解决多种稀疏估计问题而设计。该工具箱基于MATLAB,提供了丰富的算法和函数,适用于字典学习、信号处理和机器学习等领域。本文将详细介绍SPAMS的配置方法、核心功能及其在实际应用中的典型案例,帮助用户更好地理解和使用这一工具箱。 ... [详细]
  • 利用 Python Socket 实现 ICMP 协议下的网络通信
    在计算机网络课程的2.1实验中,学生需要通过Python Socket编程实现一种基于ICMP协议的网络通信功能。与操作系统自带的Ping命令类似,该实验要求学生开发一个简化的、非标准的ICMP通信程序,以加深对ICMP协议及其在网络通信中的应用的理解。通过这一实验,学生将掌握如何使用Python Socket库来构建和解析ICMP数据包,并实现基本的网络探测功能。 ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • 如何使用ES6语法编写Webpack配置文件? ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本指南详细介绍了在Linux环境中高效连接MySQL数据库的方法。用户可以通过安装并使用`mysql`客户端工具来实现本地连接,具体命令为:`mysql -u 用户名 -p 密码 -h 主机`。例如,使用管理员账户连接本地MySQL服务器的命令为:`mysql -u root -p pass`。此外,还提供了多种配置优化建议,以确保连接过程更加稳定和高效。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • AIX编程挑战赛:AIX正方形问题的算法解析与Java代码实现
    在昨晚的阅读中,我注意到了CSDN博主西部阿呆-小草屋发表的一篇文章《AIX程序设计大赛——AIX正方形问题》。该文详细阐述了AIX正方形问题的背景,并提供了一种基于Java语言的解决方案。本文将深入解析这一算法的核心思想,并展示具体的Java代码实现,旨在为参赛者和编程爱好者提供有价值的参考。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • Android目录遍历工具 | AppCrawler自动化测试进阶(第二部分):个性化配置详解
    终于迎来了“足不出户也能为社会贡献力量”的时刻,但有追求的测试工程师绝不会让自己的生活变得乏味。与其在家消磨时光,不如利用这段时间深入研究和提升自己的技术能力,特别是对AppCrawler自动化测试工具的个性化配置进行详细探索。这不仅能够提高测试效率,还能为项目带来更多的价值。 ... [详细]
author-avatar
Christy-1221
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有