最近收到需求,调研如何实时同步oracle数据到kafka,这里先做一个粗略的总结,然后简单介绍下如何使用StreamSets完成需求。
先说下我所知道的几种可行方案:
官网:https://streamsets.com/
中文站:https://streamsets.com/(不确定是否是中文官网)
StreamSets Data Collector是StreamSets下的一个子产品,先来一段中文站上的介绍感受下它的自信:
StreamSets Data Collector(SDC)是目前最先进的可视化数据采集配置工具,非常适合做实时的数据采集,兼顾批量数据采集和不落地的数据ETL。如果您正在使用Flume、Logstash、Sqoop、Canal等上一代数据采集工具,推荐您使用SDC作为升级替换。
它有以下几个特点:
参考:https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Installation/Install_title.html
StreamSets Data Collector提供了多种安装方式,这里演示第四种Core Installation,安装部分核心功能并手动启动,其他功能可按需在界面上操作安装。
https://archives.streamsets.com/datacollector/3.15.0/tarball/activation/streamsets-datacollector-core-3.15.0.tgz
$> tar -zxvf streamsets-datacollector-core-3.15.0.tgz -C ~/ #解压到根目录
$> cd ~ # 去到根目录
$> mv streamsets-datacollector-3.15.0/ streamsets-datacollector/ # 重命名
$> cd ~
$> mkdir -p sdc/conf
$> mkdir -p sdc/data
$> mkdir -p sdc/log
$> mkdir -p sdc/resources
在安装目录下libexec/sdc-env.sh中指定conf、data、log和resources的路径,即上面我们新建的文件夹路径
$> vim ~/streamsets-datacollector/libexec/sdc-env.sh
# directory where the data collector will store pipelines and their runtime information
export SDC_DATA=~/sdc/data
# directory where the data collector write its logs
export SDC_LOG=~/sdc/sdc
# directory where the data collector will read its configuration
export SDC_COnF=~/sdc/conf
# directory where the data collector will read pipeline resource files from
export SDC_RESOURCES=~/sdc/resources
$> cp ~/streamsets-datacollector/etc/* ~/sdc/conf/
$> cd ~/streamsets-datacollector
$> bin/streamsets dc
如果出现下面类似的内容则表示启动成功
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Bypass activation because SDC contains only basic stage libraries.
Logging initialized @1028ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://bigdata02:18630'
启动成功后就可以在浏览器访问StreamSets Data Collector的可视化界面了,地址就是上面最后一行的URI,用户名和密码都是admin
常见问题:
Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd
解决办法:
在/etc/security/limits.conf文件末尾加上下面两行(不要漏了*),后面的数字大于32768就行:
* soft nofile 102400
* hard nofile 102400
断开系统终端,重新连接,然后再次启动streamset就可以了
三、StreamSets Data Collector实战:实时同步oracle数据到kafka这里简单演示一个StreamSets Data Collector(SDC)中构建Pipeline的案例,实时同步oracle数据到kafka,用到的Origin是Oracle CDC Client,Destination是Kafka Producer,不使用Processor。SDC还有很多Origins和Destinations,可以构建出各种你想要的Pipeline,除了各个数据组件的自身配置可能会有差异,总体流程还是一样的。
Oracle CDC Client通过Oracle LogMiner解析redo logs获取变化数据信息,适合实时同步增量和变量数据。如果还想获取Oracle中的原始数据,可以使用JDBC Query Consumer或JDBC Multitable Consumer源另外再搭建一个单独的Pipeline。
Oracle CDC Client的使用可以参考:
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/OracleCDC.html#concept_rs5_hjj_tw
https://streamsets.com/blog/replicating-oracle-to-mysql-and-json/#ora-01291-missing-logfile
因为涉及到Oracle CDC Client中的相关配置,这里简单科普一点Oracle redo log和LogMiner的相关知识(参考官网资料)。
Redo log是Oracle的操作记录日志,记录了所有对用户数据和数据字典的改变,用以备份和恢复数据。LogMiner是Oracle开发的日志挖掘工具,可以解析redo log,分析出对数据的DML和DDL操作。为了解析redo log,LogMinder需要用到一个字典(LogMiner dictionary)。LogMiner使用该字典来解析表名和字段名。如果没有字典,解析出来的表名和字段名就只是内部的object ID,数据是二进制码。
LogMiner可以从下面三个地方获取字典,有不同的效果和性能。
Oracle CDC Client中需要配置LogMiner dictionary来源,提供了online catlog和redo log两个选项。对于这两个选项,官方文档提示:
Important: When using the dictionary in redo logs, make sure to extract the latest dictionary to the redo logs each time table structures change.
Note that using the dictionary in redo logs can have significantly higher latency than using the dictionary in the online catalog. But using the online catalog does not allow for schema changes.
就是说:
a. 如果使用redo logs中的字典,那在每次表结构改变后需要重新将字典提取到redo logs中。
b. 效率上,使用redo logs中的字典会比使用online catlog中的字典有更高的延迟,但是使用online catlog中的字典无法获取表结构更改。
需要自己根据需求取舍。
a. 使用有DBA权限的用户登录Oracle
b. 检查数据库日志模式
SQL> select log_mode from v$database;
如果结果是ARCHIVELOG,可以转至Task2。
如果结果是NOARCHIVELOG,执行下面SQL命令
SQL> shutdown immediate; # Shut down the database
SQL> startup mount; # Start up and mount the database
SQL> alter database archivelog; # enable archiving
SQL> alter database open; # open the database
为了从日志里获取数据,LogMiner需要启用数据库或表的supplemental logging。Supplemental logging又分为
identification key logging和full supplemental logging。Identification key logging只包含主键和发生改变的字段数据,而full supplemental logging包含了所有字段的数据,这两种根据需求选择一种即可。
a. 启用Identification key logging或full supplemental logging
如果你的Oracle版本是12c 或 18c multitenant,最好是为表容器启用日志,而不是整个数据库。
ALTER SESSION SET COnTAINER=
启用Identification key logging
可以指定一张表启用
也可以同时为数据库中所有表启用 启用full supplemental logging 为数据库中所有表启用 b. 提交修改 为了方便,我这里直接为所有表启用full supplemental logging。这里还是需要DBA用户操作。 在CDC Client源中需要配置账户信息连接Oracle,要求该用户有一定权限,所以这里新建一个用户。不同的Oracle版本有不同的方式。 Oracle 12c或18c 多租户版本 Oracle 12c或18c标准版本 Oracle 11g 我的是Oracle 11g,创建TEST_SDC用户,密码为123456。注意,这里同样需要使用具有DBA权限的用户操作。 当你希望LogMiner使用来自redo log中的字典时,需要执行此步骤,将字典提取到redo log中,且必须在启动pipeline之前执行。如果使用online catlog中的字典,则不需要。 如果oracle版本是11g、12c或者18c,执行下面的的命令: 如果是12c或者18c多租户版本 由于我安装的StreamSets Data Collector(SDC)是基础(core)版本,不包含oracle和kafka相关的组件,所以这里需要单独安装。如果是Full Installation应该就不需了,应该会包含所有的Origins和Destinations,但这个安装包太大了,我没有尝试。 为了对接oracle和kafka,这里需要在SDC里面安装JDBC和CDH Kafka 4.1.0(里面还有个Apache Kafka,但是一直没有安装成功)。安装方法有很多种,可以使用RPM或命令行安装,也可以在SDC管理界面中的Package Manager安装,可以参考官方教程Install Additional Stage Libraries。我这里演示使用Package Manager安装。 进入Package Manager 安装JDBC 安装Kafka 安装JDBC驱动 JDBC驱动需要通过安装外部安装包的方式安装,可以参考官方教程。 新建个文件夹,作为外部包的安装路径 在~/streamsets-datacollector/libexec/sdc-env.sh中加入配置,将 在~/sdc/conf/sdc-security.policy中添加: 下载JDBC驱动,我的是oracle 11g,下载的是ojdbc6,下载地址https://www.oracle.com/database/technologies/jdbcdriver-ucp-downloads.html 安装 新建Pipeline create new pipeline -> 填写Title -> Save 配置Pipeline 配置Oracle CDC Client 这里只说明一些必须的配置项,其他配置项没有测试,详细说明参考官网。 Oracle CDC 配置Kafka 这里只是简单配置,更详细内容可以参考官网。 Kafka 启动pipeline。 启动kafka消费端 bootstrap-server与topic参数必须与上面kafka中配置相同。 测试 这里分别向上面配置过的两张表SCOTT.EMP和SCOTT.DEPT插入数据。这里一定要记得commit。 在pipeline页面上可以看到输入了2条数据,输出了2条数据,有0条错误。 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
指定一张表启用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SQL> ALTER SYSTEM SWITCH LOGFILE;3. 创建用户
ALTER SESSION SET COnTAINER=cdb$root;
CREATE USER
GRANT create session, alter session, set container, logmining, execute_catalog_role TO
GRANT select on GV_$DATABASE to
GRANT select on V_$LOGMNR_CONTENTS to
GRANT select on GV_$ARCHIVED_LOG to
ALTER SESSION SET COnTAINER=
GRANT select on TO
CREATE USER
GRANT create session, alter session, logmining, execute_catalog_role TO
GRANT select on GV_$DATABASE to
GRANT select on V_$LOGMNR_CONTENTS to
GRANT select on GV_$ARCHIVED_LOG to
GRANT select on TO
CREATE USER
GRANT create session, alter session, execute_catalog_role, select any transaction, select any table to
GRANT select on GV_$DATABASE to
GRANT select on GV_$ARCHIVED_LOG to
GRANT select on V_$LOGMNR_CONTENTS to
GRANT select on TO
SQL> create user TEST_SDC identified by 123456;
SQL> grant create session, alter session, execute_catalog_role, select any transaction, select any table to TEST_SDC;
SQL> grant select on GV_$ARCHIVED_LOG to TEST_SDC;
SQL> grant select on V_$LOGMNR_CONTENTS to TEST_SDC;4. 提取LogMiner Dictionary到redo log (可选)
EXECUTE DBMS_LOGMNR_D.BUILD(OPTIOnS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);
ALTER SESSION SET COnTAINER=cdb$root;
EXECUTE DBMS_LOGMNR_D.BUILD(OPTIOnS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);搭建Oracle到Kafka的Pipeline
1. 安装相关组件
第一次安装需要注册,按要求填好信息并进入邮箱验证就可以了。
需要注意的是,安装过程可能会因为网络原因安装失败,想再次安装的时候会报错,提示已经安装了。这时候需要去到SDC安装目录下的streamsets-libs中,把刚刚安装失败组件的文件夹删掉,然后在重新安装。 $> mkdir ~/sdc/external-libs
export STREAMSETS_LIBRARIES_EXTRA_DIR="
// user-defined external directory
grant codebase "file:///
permission java.security.AllPermission;
};
重启SDC。2. 构建Pipeline
Select Origin,选择Oracle CDC Client -> Select Destination,选择Kafka Producer
这里可以配置一些pipeline的信息,如名称、执行模式(Standalone、Cluster)等。有一个必须用户指定的配置就是错误数据处理。可以忽略、写入到文件或者写入到其他pipeline等。我这里为了方便直接忽略了。
我这里使用了SCOTT用户下面的EMP和DEPT两张表做测试。
我使用的是Online Catalog。
JDBC
Credentials
其他配置使用默认就行。3. 测试
直接在pipeline界面点击start按钮。 $> kafka-console-consumer
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
--topic oracle2kafka SQL> insert into SCOTT.DEPT values(65, 'a', 'b');
SQL> commit;
SQL> insert into scott.emp values(1234, 'Mike', 'SALESMAN', 7698, '', 1500, 0, 30);
SQL> commit;
在kafka的消费端也可以看到刚刚插入数据库的2条数据
至此,oracle到kafka的pipeline就搭建完成了。
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有