热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用kafka连接器迁移mysql数据到ElasticSearch

文章目录概述过程详解准备连接器工具数据库和ES环境准备配置连接器启动测试概述把mysql的数据迁移到es有很多方式,比如直接用es官方推荐的logstash工具&#


文章目录

    • 概述
    • 过程详解
      • 准备连接器工具
      • 数据库和ES环境准备
      • 配置连接器
      • 启动测试


概述

把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 canal。

这里打算详细介绍另一个也是不错的同步方案,这个方案基于 kafka 的连接器。流程可以概括为:


  1. mysql连接器监听数据变更,把变更数据发送到 kafka topic。
  2. ES 监听器监听kafka topic 消费,写入 ES。

Kafka Connect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector,也就是连接器。在本例中,mysql的连接器是source,es的连接器是sink。

这些连接器本身已经开源,我们之间拿来用即可。不需要再造轮子。


过程详解


准备连接器工具

我下面所有的操作都是在自己的mac上进行的。

首先我们准备两个连接器,分别是 kafka-connect-elasticsearchkafka-connect-elasticsearch, 你可以通过源码编译他们生成jar包,源码地址:

kafka-connect-elasticsearch

kafka-connect-mysql

我个人不是很推荐这种源码的编译方式,因为真的好麻烦。除非想研究源码。

我是直接下载 confluent 平台的工具包,里面有编译号的jar包可以直接拿来用,下载地址:

confluent 工具包

我下载的是 confluent-5.3.1 版本, 相关的jar包在 confluent-5.3.1/share/java 目录下

我们把编译好的或者下载的jar包拷贝到kafka的libs目录下。拷贝的时候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相关的依赖包也要一起拷贝过来,比如es这个jar包目录下的http相关的,jersey相关的等,否则会报各种 java.lang.NoClassDefFoundError 的错误。

另外mysql-connector-java-5.1.22.jar也要放进去。


数据库和ES环境准备

数据库和es我都是在本地启动的,这个过程具体就不说了,网上有很多参考的。

我创建了一个名为test的数据库,里面有一个名为login的表。


配置连接器

这部分是最关键的,我实际操作的时候这里也是最耗时的。

首先配置jdbc的连接器。

我们从confluent工具包里拷贝一个配置文件的模板(confluent-5.3.1/share目录下),自带的只有sqllite的配置文件,拷贝一份到kafka的config目录下,改名为sink-quickstart-mysql.properties,文件内容如下:

# tasks to create:
name=mysql-login-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111
mode=timestamp+incrementing
timestamp.column.name=login_time
incrementing.column.name=id
topic.prefix=mysql.
table.whitelist=login

connection.url指定要连接的数据库,这个根据自己的情况修改。mode指示我们想要如何查询数据。在本例中我选择incrementing递增模式和timestamp 时间戳模式混合的模式, 并设置incrementing.column.name递增列的列名和时间戳所在的列名。

** 混合模式还是比较推荐的,它能尽量的保证数据同步不丢失数据。**具体的原因大家可以查阅相关资料,这里就不详述了。

topic.prefix是众多表名之前的topic的前缀,table.whitelist是白名单,表示要监听的表,可以使组合多个表。两个组合在一起就是该表的变更topic,比如在这个示例中,最终的topic就是mysql.login。

connector.class是具体的连接器处理类,这个不用改。

其它的配置基本不用改。

接下来就是ES的配置了。同样也是拷贝 quickstart-elasticsearch.properties 文件到kafka的config目录下,然后修改,我自己的环境内容如下:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql.login
key.ignore=true
connection.url=http://localhost:9200
type.name=mysqldata

topics的名字和上面mysql设定的要保持一致,同时这个也是ES数据导入的索引。从里也可以看出,ES的连接器一个实例只能监听一张表。

type.name需要关注下,我使用的ES版本是7.1,我们知道在7.x的版本中已经只有一个固定的type(_doc)了,使用低版本的连接器在同步的时候会报错误,我这里使用的5.3.1版本已经兼容了。继续看下面的章节就知道了。

关于es连接器和es的兼容性问题,有兴趣的可以看看下面这个issue:

https://github.com/confluentinc/kafka-connect-elasticsearch/issues/314


启动测试

当然首先启动zk和kafka。

然后我们启动mysql的连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接着手动往login表插入几条记录,正常情况下这些变更已经发到kafka对应的topic上去了。为了验证,我们在控制台启动一个消费者从mysql.login主题读取数据:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

可以看到刚才插入的数据。

把数据从 MySQL 移动到 Kafka 里就算完成了,接下来把数据从 Kafka 写到 ElasticSearch 里。

首先启动ES和kibana,当然后者不是必须的,只是方便我们在IDE环境里测试ES。你也可以通过控制台给ES发送HTTP的指令。

先把之前启动的mysql连接器进程结束(因为会占用端口),再启动 ES 连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

如果正常的话,ES这边应该已经有数据了。打开kibana的开发工具,在console里执行

GET _cat/indices

这是获取节点上所有的索引,你应该能看到,

green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb

说明索引已经正常创建了。然后我们查询下,

GET mysql.login/_search?pretty=true

结果如下,

{"took" : 1,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 4,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+0","_score" : 1.0,"_source" : {"id" : 1,"username" : "lucas1","login_time" : 1575870785000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+1","_score" : 1.0,"_source" : {"id" : 2,"username" : "lucas2","login_time" : 1575870813000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+2","_score" : 1.0,"_source" : {"id" : 3,"username" : "lucas3","login_time" : 1575874031000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+3","_score" : 1.0,"_source" : {"id" : 4,"username" : "lucas4","login_time" : 1575874757000}}]}
}


参考:

1.《kafka权威指南》
2. https://www.jianshu.com/p/46b6fa53cae4


推荐阅读
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • ElasticSerach初探第一篇认识ES+环境搭建+简单MySQL数据同步+SpringBoot整合ES
    一、认识ElasticSearch是一个基于Lucene的开源搜索引擎,通过简单的RESTfulAPI来隐藏Lucene的复杂性。全文搜索,分析系统&# ... [详细]
  • 开发笔记:MyBatis学习之逆向工程
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了MyBatis学习之逆向工程相关的知识,希望对你有一定的参考价值。转载:http://w ... [详细]
  • 0x00端口渗透端口扫描端口的指纹信息(版本信息)端口所对应运行的服务常见的默认端口号.尝试弱口令端口爆破hydra端口弱口令NTScanHs ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 本文介绍了将mysql从5.6.15升级到5.7.15的详细步骤,包括关闭访问、备份旧库、备份权限、配置文件备份、关闭旧数据库、安装二进制、替换配置文件以及启动新数据库等操作。 ... [详细]
  • Centos7搭建ELK(Elasticsearch、Logstash、Kibana)教程及注意事项
    本文介绍了在Centos7上搭建ELK(Elasticsearch、Logstash、Kibana)的详细步骤,包括下载安装包、安装Elasticsearch、创建用户、修改配置文件等。同时提供了使用华为镜像站下载安装包的方法,并强调了保证版本一致的重要性。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 03Spring使用注解方式注入
    基于注解的DI注入1.导包环境搭建:导入aop包(spring-aop-4.1.6.RELEASE.jar)2.创建类3.创建spring.xml配置文件(必须在src目录下)该配 ... [详细]
  • 此版本重点升级了Online代码生成器,支持更多的控件生成,所见即所得,极大的提高开发效率;同时做了数据库兼容专项工作,让Online开发兼容更多数据库:Mysql、SqlServer、Oracle、Postgresql等!!!项目介绍 ... [详细]
  • 架构师必读:日均500万数据,如何进行数据存储选型?
    点击上方关注我,选择“置顶或者星标”作者:麦田里的老农来源:https:zhuanlan.zhihu.comp37964096小编公司有一 ... [详细]
  • ELK stack 学习记录
    ELK日志分析平台学习记录首先ELK主要指elasticsearch、logstash和kibana,三个开源软件组合而成的一套日志平台解决方案。可以将平时收集到的日志,通过前台展 ... [详细]
author-avatar
昆明DVD导航
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有