热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

怎样将MySQL数据表导入到Elasticsearch

MySQL导入Elasticsearch的方法有很多,通常是使用ETL工具,但我觉得太麻烦。于是想

本文节选自《Netkiller Database 手札》


MySQL 导入 Elasticsearch 的方法有很多,通常是使用ETL工具,但我觉得太麻烦。于是想到 logstash 。



23.8. Migrating MySQL Data into Elasticsearch using logstash


23.8.1. 安装 logstash

安装 JDBC 驱动 和 Logstash 

curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh | bash
curl -s https://raw.githubusercontent.com/oscm/shell/master/log/kibana/logstash-5.x.sh | bash

mysql 驱动文件位置在 /usr/share/java/mysql-connector-java.jar 

23.8.2. 配置 logstash

创建配置文件 /etc/logstash/conf.d/jdbc-mysql.conf

mysql> desc article;
+-------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| ctime | datetime | NO | | NULL | |
| content | longtext | YES | | NULL | |
+-------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from article"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
}
}

23.8.3. 启动 Logstash

root@netkiller /var/log/logstash % systemctl restart logstash
root@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago
Main PID: 10434 (java)
CGroup: /system.slice/logstash.service
└─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Started logstash.
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Starting logstash...
root@netkiller /var/log/logstash % cat logstash-plain.log
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc ] (0.006000s) select * from article

23.8.4. 验证

% curl -XGET 'http://localhost:9200/_all/_search?pretty'

23.8.5. 配置模板

23.8.5.1. 全量导入

适合数据没有改变的归档数据或者只能增加没有修改的数据

input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from article"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
}
}

23.8.5.2. 多表导入

多张数据表导入到 Elasticsearch

# multiple inputs on logstash jdbc
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from article"
type => "article"
}
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from comment"
type => "comment"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "%{type}"
document_id => "%{id}"
}
}

需要在每一个jdbc配置项中加入 type 配置,然后 elasticsearch 配置项中加入 document_type => "%{type}" 

23.8.5.3. 通过 ID 主键字段增量复制数据

input {
jdbc {
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
# ... other configuration bits
}
}

tracking_column_type => "numeric" 可以声明 id 字段的数据类型, 如果不指定将会默认为日期

[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.4. 通过日期字段增量复制数据

input {
jdbc {
statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"
use_column_value => true
tracking_column => "create_date"
# ... other configuration bits
}
}

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.5. 指定SQL文件

statement_filepath 指定 SQL 文件,有时SQL太复杂写入 statement 配置项维护部方便,可以将 SQL 写入一个文本文件,然后使用 statement_filepath 配置项引用该文件。

input {
jdbc {
jdbc_driver_library => "/path/to/driver.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_url => "jdbc://postgresql"
jdbc_user => "neo"
jdbc_password => "password"
statement_filepath => "query.sql"
}
}

23.8.5.6. 参数传递

将需要复制的条件参数写入 parameters 配置项

input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}

23.8.5.7. 控制返回JDBC数据量

jdbc_fetch_size => 1000 #jdbc获取数据的数量大小
jdbc_page_size => 1000 #jdbc一页的大小,
jdbc_paging_enabled => true #和jdbc_page_size组合,将statement的查询分解成多个查询,相当于: SELECT * FROM table LIMIT 1000 OFFSET 4000

23.8.6. example

input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/var/tmp/article.last"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
action => "update"  # 操作执行的动作,可选值有["index", "delete", "create", "update"]
doc_as_upsert => true #支持update模式
}
}

Donations (打赏)

We accept PayPal through:

https://www.paypal.me/netkiller

Wechat (微信) / Alipay (支付宝) 打赏:

http://www.netkiller.cn/home/donations.html


作者相关文章:

Spring data 数据库建表(一对一,一对多,多对多)

Apache Sqoop 将mysql导入到Hadoop HDFS

Spring boot with Apache Hive

Apache Hive 快速入门

CentOS 7.3 + Server JRE 1.8 + Hadoop-2.8.0

Apache Hbase 快速入门

Spring cloud 之 Feign Client

Spring Cloud Netflix

重新整理AUTO_INCREMENT字段

Spring Cloud Config

Spring boot with Schedule (启用/禁用)

Spring boot with HTTPS SSL

Spring boot with Git version

《Netkiller Virtualization 手札》Docker 卷管理

PHP高级编程之守护进程

Spring boot with Docker

Spring boot with Service

Spring boot with PostgreSQL

Struts2 S2-046, S2-045 Firewall(漏洞防火墙)

数据库与图片完美解决方案

数据库进程间通信解决方案

数据库进程间通信解决方案之MQ

Linux 系统安全与优化配置

Tomcat 安全配置与性能优化

Linux 系统与数据库安全


转载请注明出处与作者声明,扫描二维码关注作者公众好,不定期更新文章




推荐阅读
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • WhenIusepythontoapplythepymysqlmoduletoaddafieldtoatableinthemysqldatabase,itdo ... [详细]
  • 本文主要复习了数据库的一些知识点,包括环境变量设置、表之间的引用关系等。同时介绍了一些常用的数据库命令及其使用方法,如创建数据库、查看已存在的数据库、切换数据库、创建表等操作。通过本文的学习,可以加深对数据库的理解和应用能力。 ... [详细]
  • ZABBIX 3.0 配置监控NGINX性能【OK】
    1.在agent端查看配置:nginx-V查看编辑时是否加入状态监控模块:--with-http_stub_status_module--with-http_gzip_stat ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • Java实战之电影在线观看系统的实现
    本文介绍了Java实战之电影在线观看系统的实现过程。首先对项目进行了简述,然后展示了系统的效果图。接着介绍了系统的核心代码,包括后台用户管理控制器、电影管理控制器和前台电影控制器。最后对项目的环境配置和使用的技术进行了说明,包括JSP、Spring、SpringMVC、MyBatis、html、css、JavaScript、JQuery、Ajax、layui和maven等。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • Python SQLAlchemy库的使用方法详解
    本文详细介绍了Python中使用SQLAlchemy库的方法。首先对SQLAlchemy进行了简介,包括其定义、适用的数据库类型等。然后讨论了SQLAlchemy提供的两种主要使用模式,即SQL表达式语言和ORM。针对不同的需求,给出了选择哪种模式的建议。最后,介绍了连接数据库的方法,包括创建SQLAlchemy引擎和执行SQL语句的接口。 ... [详细]
  • 本文介绍了在使用Laravel和sqlsrv连接到SQL Server 2016时,如何在插入查询中使用输出子句,并返回所需的值。同时讨论了使用CreatedOn字段返回最近创建的行的解决方法以及使用Eloquent模型创建后,值正确插入数据库但没有返回uniqueidentifier字段的问题。最后给出了一个示例代码。 ... [详细]
  • 进入配置文件目录:[rootlinuxidcresin-4.0.]#cdusrlocalresinconf查看都有哪些配置文件:[rootlinuxid ... [详细]
author-avatar
赖-哥_528
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有