热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

4.3.5Flink流处理框架FlinkCDC数据实时数据同步FlinkCDC实操FlinkSQL方式

目录1.写在前面2.Maven依赖3.代码实现-普通实现4.集群测试4.1环境准备4.2查看任务结果1.写在前面FlinkCDC有两种实现方式,一种是Da

目录

1.写在前面

2.Maven依赖

3.代码实现-普通实现

4.集群测试

4.1 环境准备

4.2 查看任务结果




1.写在前面

        Flink CDC有两种实现方式,一种是DataStream,另一种是FlinkSQL方式。


  • DataStream方式:优点是可以应用于多库多表,缺点是需要自定义反序列化器(灵活)
  • FlinkSQL方式:优点是不需要自定义反序列化器,缺点是只能应用于单表查询

2.Maven依赖

org.apache.flinkflink-java1.12.7org.apache.flinkflink-streaming-java_2.121.12.7org.apache.flinkflink-clients_2.121.12.7org.apache.hadoophadoop-client2.7.7mysqlmysql-connector-java5.1.49com.alibaba.ververicaflink-connector-mysql-cdc1.2.0com.alibabafastjson1.2.75org.apache.flinkflink-table-planner-blink_2.121.12.7org.apache.maven.pluginsmaven-assembly-plugin3.0.0jar-with-dependenciesmake-assemblypackagesingleorg.apache.maven.pluginsmaven-compiler-plugin88

3.代码实现-普通实现

package com.atguigu;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkCDCWithSQL {public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.DDL方式建表,flink_sql的方式只能监控一张表tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +" id STRING NOT NULL, " +" tm_name STRING, " +" logo_url STRING " +") WITH ( " +" 'connector' = 'mysql-cdc', " +" 'hostname' = '192.168.0.111', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'gmall2021', " +" 'table-name' = 'base_trademark' " +")");//3.查询数据Table table = tableEnv.sqlQuery("select * from mysql_binlog");//4.将动态表转换为流DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);retractStream.print();//5.启动任务env.execute("FlinkCDCWithSQL");}}

4.集群测试


4.1 环境准备


  1. 启动ha-hadoop集群:sh ha-hadoop.sh start
  2. 创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory

  3. 启动Flink集群和任务历史服务

    1. start-cluster.sh
    2. historyserver.sh start
  4. 运行该Flink任务:/opt/software/flink-1.12.7/bin/flink run -m yarn-cluster -ys 1 -ynm gmall-flink-cdc -c com.ucas.FlinkCDCWithCustomerDeserialization -d /root/mySoftware/gmall-flink-cdc.jar

4.2 查看任务结果

(1)打开yarn,查看任务:http://192.168.0.112:8088/cluster/apps,并且通过id点击进去

(2)点击Tracking URL,进入FlinkWeb界面

 (3) 打开左侧TaskManagers中的Stdout查看控制台输出信息


推荐阅读
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 本文介绍了将mysql从5.6.15升级到5.7.15的详细步骤,包括关闭访问、备份旧库、备份权限、配置文件备份、关闭旧数据库、安装二进制、替换配置文件以及启动新数据库等操作。 ... [详细]
  • 本文介绍了如何清除Eclipse中SVN用户的设置。首先需要查看使用的SVN接口,然后根据接口类型找到相应的目录并删除相关文件。最后使用SVN更新或提交来应用更改。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 1、问题?项目打包报错;程序包com.sun.image.codec.jpeg不存在;2、原因尚不明确;由于jdk升级问题。才出现的,可能jdk6就不会出现;初步怀疑jdk的问题; ... [详细]
  • 商品信息|都会_淘淘商城8.6
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了淘淘商城---8.6相关的知识,希望对你有一定的参考价值。继续八月五号写的,今天晚上花点时间开发这个项目& ... [详细]
  • 参照www.Micro_Studios.com的视频,在Ubuntu中成功安装了opencv,并且测试成功。现把具体的安装及测试过程整理出来࿰ ... [详细]
  • 数据库异常智能分析与诊断
    数据库,异常, ... [详细]
author-avatar
手机用户2502931823
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有