热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用kafka连接器迁移mysql数据到ElasticSearch

文章目录概述过程详解准备连接器工具数据库和ES环境准备配置连接器启动测试概述把mysql的数据迁移到es有很多方式,比如直接用es官方推荐的logstash工具&#


文章目录

    • 概述
    • 过程详解
      • 准备连接器工具
      • 数据库和ES环境准备
      • 配置连接器
      • 启动测试


概述

把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 canal。

这里打算详细介绍另一个也是不错的同步方案,这个方案基于 kafka 的连接器。流程可以概括为:


  1. mysql连接器监听数据变更,把变更数据发送到 kafka topic。
  2. ES 监听器监听kafka topic 消费,写入 ES。

Kafka Connect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector,也就是连接器。在本例中,mysql的连接器是source,es的连接器是sink。

这些连接器本身已经开源,我们之间拿来用即可。不需要再造轮子。


过程详解


准备连接器工具

我下面所有的操作都是在自己的mac上进行的。

首先我们准备两个连接器,分别是 kafka-connect-elasticsearchkafka-connect-elasticsearch, 你可以通过源码编译他们生成jar包,源码地址:

kafka-connect-elasticsearch

kafka-connect-mysql

我个人不是很推荐这种源码的编译方式,因为真的好麻烦。除非想研究源码。

我是直接下载 confluent 平台的工具包,里面有编译号的jar包可以直接拿来用,下载地址:

confluent 工具包

我下载的是 confluent-5.3.1 版本, 相关的jar包在 confluent-5.3.1/share/java 目录下

我们把编译好的或者下载的jar包拷贝到kafka的libs目录下。拷贝的时候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相关的依赖包也要一起拷贝过来,比如es这个jar包目录下的http相关的,jersey相关的等,否则会报各种 java.lang.NoClassDefFoundError 的错误。

另外mysql-connector-java-5.1.22.jar也要放进去。


数据库和ES环境准备

数据库和es我都是在本地启动的,这个过程具体就不说了,网上有很多参考的。

我创建了一个名为test的数据库,里面有一个名为login的表。


配置连接器

这部分是最关键的,我实际操作的时候这里也是最耗时的。

首先配置jdbc的连接器。

我们从confluent工具包里拷贝一个配置文件的模板(confluent-5.3.1/share目录下),自带的只有sqllite的配置文件,拷贝一份到kafka的config目录下,改名为sink-quickstart-mysql.properties,文件内容如下:

# tasks to create:
name=mysql-login-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111
mode=timestamp+incrementing
timestamp.column.name=login_time
incrementing.column.name=id
topic.prefix=mysql.
table.whitelist=login

connection.url指定要连接的数据库,这个根据自己的情况修改。mode指示我们想要如何查询数据。在本例中我选择incrementing递增模式和timestamp 时间戳模式混合的模式, 并设置incrementing.column.name递增列的列名和时间戳所在的列名。

** 混合模式还是比较推荐的,它能尽量的保证数据同步不丢失数据。**具体的原因大家可以查阅相关资料,这里就不详述了。

topic.prefix是众多表名之前的topic的前缀,table.whitelist是白名单,表示要监听的表,可以使组合多个表。两个组合在一起就是该表的变更topic,比如在这个示例中,最终的topic就是mysql.login。

connector.class是具体的连接器处理类,这个不用改。

其它的配置基本不用改。

接下来就是ES的配置了。同样也是拷贝 quickstart-elasticsearch.properties 文件到kafka的config目录下,然后修改,我自己的环境内容如下:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql.login
key.ignore=true
connection.url=http://localhost:9200
type.name=mysqldata

topics的名字和上面mysql设定的要保持一致,同时这个也是ES数据导入的索引。从里也可以看出,ES的连接器一个实例只能监听一张表。

type.name需要关注下,我使用的ES版本是7.1,我们知道在7.x的版本中已经只有一个固定的type(_doc)了,使用低版本的连接器在同步的时候会报错误,我这里使用的5.3.1版本已经兼容了。继续看下面的章节就知道了。

关于es连接器和es的兼容性问题,有兴趣的可以看看下面这个issue:

https://github.com/confluentinc/kafka-connect-elasticsearch/issues/314


启动测试

当然首先启动zk和kafka。

然后我们启动mysql的连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接着手动往login表插入几条记录,正常情况下这些变更已经发到kafka对应的topic上去了。为了验证,我们在控制台启动一个消费者从mysql.login主题读取数据:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

可以看到刚才插入的数据。

把数据从 MySQL 移动到 Kafka 里就算完成了,接下来把数据从 Kafka 写到 ElasticSearch 里。

首先启动ES和kibana,当然后者不是必须的,只是方便我们在IDE环境里测试ES。你也可以通过控制台给ES发送HTTP的指令。

先把之前启动的mysql连接器进程结束(因为会占用端口),再启动 ES 连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

如果正常的话,ES这边应该已经有数据了。打开kibana的开发工具,在console里执行

GET _cat/indices

这是获取节点上所有的索引,你应该能看到,

green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb

说明索引已经正常创建了。然后我们查询下,

GET mysql.login/_search?pretty=true

结果如下,

{"took" : 1,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 4,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+0","_score" : 1.0,"_source" : {"id" : 1,"username" : "lucas1","login_time" : 1575870785000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+1","_score" : 1.0,"_source" : {"id" : 2,"username" : "lucas2","login_time" : 1575870813000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+2","_score" : 1.0,"_source" : {"id" : 3,"username" : "lucas3","login_time" : 1575874031000}},{"_index" : "mysql.login","_type" : "mysqldata","_id" : "mysql.login+0+3","_score" : 1.0,"_source" : {"id" : 4,"username" : "lucas4","login_time" : 1575874757000}}]}
}


参考:

1.《kafka权威指南》
2. https://www.jianshu.com/p/46b6fa53cae4


推荐阅读
  • 本文详细介绍了如何在 Linux 系统上安装 JDK 1.8、MySQL 和 Redis,并提供了相应的环境配置和验证步骤。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • 利用爬虫技术抓取数据,结合Fiddler与Postman在Chrome中的应用优化提交流程
    本文探讨了如何利用爬虫技术抓取目标网站的数据,并结合Fiddler和Postman工具在Chrome浏览器中的应用,优化数据提交流程。通过详细的抓包分析和模拟提交,有效提升了数据抓取的效率和准确性。此外,文章还介绍了如何使用这些工具进行调试和优化,为开发者提供了实用的操作指南。 ... [详细]
  • 通过利用代码自动生成技术,旨在减轻软件开发的复杂性,缩短项目周期,减少冗余代码的编写,从而显著提升开发效率。该方法不仅能够降低开发人员的工作强度,还能确保代码的一致性和质量。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 基于Dubbo与Zipkin的微服务调用链路监控解决方案
    本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 为什么多数程序员难以成为架构师?
    探讨80%的程序员为何难以晋升为架构师,涉及技术深度、经验积累和综合能力等方面。本文将详细解析Tomcat的配置和服务组件,帮助读者理解其内部机制。 ... [详细]
  • 在将Web服务器和MySQL服务器分离的情况下,是否需要在Web服务器上安装MySQL?如果安装了MySQL,如何解决PHP连接MySQL服务器时出现的连接失败问题? ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
author-avatar
昆明DVD导航
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有