热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka数据同步Elasticsearch深入详解

kafka数据同步Elasticsearch深入详解,Go语言社区,Golang程序员人脉社

1、kafka同步到Elasticsearch方式?

目前已知常用的方式有四种:
1)logstash_input_kafka插件;
缺点:不稳定(ES中文社区讨论)
2)spark stream同步;
缺点:太庞大
3)kafka connector同步;
4)自写程序读取、解析、写入
这里写图片描述
本文主要基于kafka connector实现kafka到Elasticsearch全量、增量同步。

2、从confluenct说起

LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的。
Confluent Platform简化了连接数据源到Kafka,用Kafka构建应用程序,以及安全,监控和管理您的Kafka的基础设施。
confluent组成如下所示:
这里写图片描述

1)Apache Kafka
消息分发组件,数据采集后先入Kafka。
2)Schema Registry
Schema管理服务,消息出入kafka、入hdfs时,给数据做序列化/反序列化处理。
3)Kafka Connect
提供kafka到其他存储的管道服务,此次焦点是从kafka到hdfs,并建立相关HIVE表。
4)Kafka Rest Proxy
提供kafka的Rest API服务。
5)Kafka Clients
提供Client编程所需SDK。

默认端口对应表:

组件 | 端口

Apache Kafka brokers (plain text):9092

Confluent Control Center:9021

Kafka Connect REST API:8083

REST Proxy:8082

Schema Registry REST API:8081

ZooKeeper:2181

3、kafka connector介绍。

Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型。

通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。

Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。

而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、 Elastic Search、 Apache Ignite等。

KafkaConnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。

kafkaConnect通过Jest实现Kafka对接Elasticsearch。

4、kafka connector安装

实操非研究性的目的,不建议源码安装。
直接从官网down confluent安装即可。地址:https://www.confluent.io/download/

如下,解压后既可以使用。

[root@kafka_no1 confluent-3.3.0]# pwd
/home/confluent/confluent-3.3.0

[root@kafka_no1 confluent-3.3.0]# ls -al
total 32
drwxrwxr-x. 7 root root 4096 Dec 16 10:08 .
drwxr-xr-x. 3 root root 4096 Dec 20 15:34 ..
drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bin
drwxr-xr-x. 18 root root 4096 Jul 28 08:30 etc
drwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs
-rw-rw-r--. 1 root root 871 Jul 28 08:45 README
drwxr-xr-x. 10 root root 4096 Jul 28 08:30 share
drwxrwxr-x. 2 root root 4096 Jul 28 08:45 src

5、kafka connector模式

Kafka connect 有两种工作模式
1)standalone:在standalone模式中,所有的worker都在一个独立的进程中完成。

2)distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。

6、kafka connector同步步骤

前提:

$ confluent start

如下的服务都需要启动:

Starting zookeeper
zookeeper is [UP] ——对应端口:2181
Starting kafka
kafka is [UP]——对应端口:9092
Starting schema-registry
schema-registry is [UP]——对应端口:8081
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

可以,netstat -natpl 查看端口是否监听ok。

步骤1:创建topic

./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic  test-elasticsearch-sink

步骤2:生产者发布消息

假定avrotest topic已经创建。

./kafka-avro-console-producer  --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink 
         --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

步骤3:消费者订阅消息测试(验证生产者消息可以接收到)

./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic  test-elasticsearch-sink --from-beginning

步骤4:connector传输数据操作到ES

./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties 
../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

注意此处: connect-standalone模式,对应 connect-avro-standalone.properties要修改;
如果使用connect-distribute模式,对应的connect-avro-distribute.properties要修改。
这里 quickstart-elasticsearch.properties :启动到目的Elasticsearch配置。

quickstart-elasticsearch.properties**设置**:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#kafka主题名称,也是对应Elasticsearch索引名称
topics= test-elasticsearch-sink

key.ignore=true
#ES url信息
connection.url=http://110.18.6.20:9200
#ES type.name固定
type.name=kafka-connect

7、同步效果。

这里写图片描述

curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'

8、连接信息查询REST API
    -
GET /connectors – 返回所有正在运行的connector名。
- POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
- GET /connectors/{name} – 获取指定connetor的信息。
- GET /connectors/{name}/config – 获取指定connector的配置信息。
- PUT /connectors/{name}/config – 更新指定connector的配置信息。
- GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
- GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
- GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
- PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
- PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
- POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
- POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
- DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

9、小结。

他山之石,可以攻玉。
kafka上的小学生,继续加油!

参考:

[1]kafka-connect部署及简介:http://t.cn/RiUCaWx
[2]connector介绍:http://orchome.com/344
[3]英文-同步介绍http://t.cn/RYeZm7P
[4]部署&开发http://t.cn/RTeyOEl
[5]confluent生态链http://t.cn/RTebVyL
[6]快速启动参考:https://docs.confluent.io/3.3.0/quickstart.html
[7]ES-connector:http://t.cn/RTecXmc

——————————————————————————————————
更多ES相关实战干货经验分享,请扫描下方【铭毅天下】微信公众号二维码关注。
(每周至少更新一篇!)

这里写图片描述
和你一起,死磕Elasticsearch
——————————————————————————————————

2017.12.21 23:24 于家中床前

作者:铭毅天下
转载请标明出处,原文地址:
http://blog.csdn.net/laoyang360/article/details/78868806
如果感觉本文对您有帮助,请点击‘顶’支持一下,您的支持是我坚持写作最大的动力,谢谢!


推荐阅读
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • Android中将独立SO库封装进JAR包并实现SO库的加载与调用
    在Android开发中,将独立的SO库封装进JAR包并实现其加载与调用是一个常见的需求。本文详细介绍了如何将SO库嵌入到JAR包中,并确保在外部应用调用该JAR包时能够正确加载和使用这些SO库。通过这种方式,开发者可以更方便地管理和分发包含原生代码的库文件,提高开发效率和代码复用性。文章还探讨了常见的问题及其解决方案,帮助开发者避免在实际应用中遇到的坑。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 字节跳动深圳研发中心安全业务团队正在火热招募人才! ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文介绍了如何使用Hive分析用户最长连续登录天数的方法。首先对数据进行排序,然后计算相邻日期之间的差值,接着按用户ID分组并累加连续登录天数,最后求出每个用户的最大连续登录天数。此外,还探讨了该方法在其他领域的应用,如股票市场中最大连续涨停天数的分析。 ... [详细]
  • sh cca175problem03evolveavroschema.sh ... [详细]
author-avatar
yangxinhui2602905795
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有