热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

pyflink实时接收kafka数据至mysql

frompyflink.datasetimportExecutionEnvironmentfrompyflink.tableimportTableConfig,DataTypes,

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
from pyflink.table.catalog import HiveCatalog
from pyflink.table import SqlDialect
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)
catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
t_env.use_catalog("myhive")
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# Create a catalog table
t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
etl_date STRING
,id BIGINT
,user_id BIGINT
,height DECIMAL(5,2)
,weight DECIMAL(5,2)
)
""")
# should return the tables in current catalog and database.
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(f"""
CREATE TEMPORARY TABLE source_parent_info(
id bigint
,user_id bigint
,height decimal(5,2)
,weight decimal(5,2)
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
'connector.driver'= 'com.mysql.cj.jdbc.Driver',
'connector.table' = 'parent_info',
'connector.username' = 'root',
'connector.password' = 'xxxx',
'connector.write.flush.interval' = '1s')
""")
t_env.execute_sql("""
INSERT INTO sink_parent_info
SELECT
id
,user_id
,height
,weight
FROM source_parent_info
""").wait()

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830



推荐阅读
  • 本文详细介绍了使用 Python 进行 MySQL 和 Redis 数据库操作的实战技巧。首先,针对 MySQL 数据库,通过 `pymysql` 模块展示了如何连接和操作数据库,包括建立连接、执行查询和更新等常见操作。接着,文章深入探讨了 Redis 的基本命令和高级功能,如键值存储、列表操作和事务处理。此外,还提供了多个实际案例,帮助读者更好地理解和应用这些技术。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 针对MySQL Undo空间满载及Oracle Undo表空间溢出的问题,本文详细探讨了其原因与解决策略。首先,通过启动SQL*Plus并以SYS用户身份登录数据库,查询当前数据库的UNDO表空间名称,确认当前状态。接着,分析导致Undo空间满载的常见原因,如长时间运行的事务、频繁的更新操作等,并提出相应的解决方案,包括调整Undo表空间大小、优化事务管理、定期清理历史数据等。最后,结合实际案例,提供具体的实施步骤和注意事项,帮助DBA有效应对这些问题。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 通过使用Sqoop导入工具,可以精确控制并高效地将表数据的特定子集导入到HDFS中。具体而言,可以通过在导入命令中添加WHERE子句来指定所需的数据范围,从而在数据库服务器上执行相应的SQL查询,并将查询结果高效地存储到HDFS中。这种方法不仅提高了数据导入的灵活性,还确保了数据的准确性和完整性。 ... [详细]
  • 本地存储组件实现对IE低版本浏览器的兼容性支持 ... [详细]
  • 如何使用 `org.opencb.opencga.core.results.VariantQueryResult.getSource()` 方法及其代码示例详解 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • Squaretest:自动生成功能测试代码的高效插件
    本文将介绍一款名为Squaretest的高效插件,该工具能够自动生成功能测试代码。使用这款插件的主要原因是公司近期加强了代码质量的管控,对各项目进行了严格的单元测试评估。Squaretest不仅提高了测试代码的生成效率,还显著提升了代码的质量和可靠性。 ... [详细]
author-avatar
mobiledu2502876193
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有