热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用ESHadoop将SparkStreaming流数据写入ES

本文将详细介绍利用ES-Hadoop将Spark处理的数据写入到ES中。一、开发环境1、组件版本CDH集群版本:6.0.1Spark版本:2.2.0Ka

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖


<dependency><groupId>org.scala-langgroupId><artifactId>scala-libraryartifactId><version>2.11.8version>
dependency>
<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.11artifactId><version>2.2.0version>
dependency>
<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.11artifactId><version>2.2.0version>
dependency>
<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka-0-10_2.11artifactId><version>2.2.0version>
dependency>
<dependency><groupId>org.apache.zookeepergroupId><artifactId>zookeeperartifactId><version>3.4.5-cdh6.0.1version>
dependency>
<dependency><groupId>org.elasticsearchgroupId><artifactId>elasticsearch-spark-20_2.11artifactId><version>6.5.4version>
dependency>
<dependency><groupId>commons-httpclientgroupId><artifactId>commons-httpclientartifactId><version>3.1version>
dependency>

3、注意事项

如果使用 CDH 版本的 Spark&#xff0c;则在调试及实际部署运行的时候会出现下面的错误&#xff1a;

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的&#xff0c;对比开源版本与 CDH 版本的 Spark&#xff0c;发现开源版本多出了 commons-httpclient-3.1.jar&#xff0c;因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态&#xff08;Hive、Spark、Pig、Storm 等&#xff09;与 ElasticSearch 之间的数据交互&#xff0c;借助该组件可以将 Hadoop 生态的数据写入到 ES 中&#xff0c;然后借助 ES 对数据快速进行搜索、过滤、聚合等分析&#xff0c;进一步可以通过 Kibana 来实现数据的可视化。

同时&#xff0c;也可以借助 ES 作为数据存储层&#xff08;类似数仓的 Stage 层或者 ODS 层&#xff09;&#xff0c;然后借助 Hadoop 生态的数据处理工具&#xff08;Hive、MR、Spark 等&#xff09;将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层&#xff0c;可以很好的进行数据去重、数据质量分析&#xff0c;还可以提供一些即时的数据服务&#xff0c;例如趋势展示、汇总分析等。

2、组成

ES-Hadoop 是一个整合性质的组件&#xff0c;它封装了 Hadoop 生态的多种组件与 ES 交互的 API&#xff0c;如果你只需要部分功能&#xff0c;可以使用细分的组件&#xff1a;

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互&#xff0c;下面是几个重要配置项&#xff0c;更多配置信息请参阅官方说明&#xff1a;

  • es.nodes&#xff1a;需要连接的 es 节点&#xff08;不需要配置全部节点&#xff0c;默认会自动发现其他可用节点&#xff09;&#xff1b;
  • es.port&#xff1a;节点 http 通讯端口&#xff1b;
  • es.nodes.discovery&#xff1a;默认为 true&#xff0c;表示自动发现集群可用节点&#xff1b;
  • es.nodes.wan.only&#xff1a;默认为 false&#xff0c;设置为 true 之后&#xff0c;会关闭节点的自动 discovery&#xff0c;只使用 es.nodes 声明的节点进行数据读写操作&#xff1b;如果你需要通过域名进行数据访问&#xff0c;则设置该选项为 true&#xff0c;否则请务必设置为 false&#xff1b;
  • es.index.auto.create&#xff1a;是否自动创建不存在的索引&#xff0c;默认为 true&#xff1b;
  • es.net.http.auth.user&#xff1a;Basic 认证的用户名&#xff1b;
  • es.net.http.auth.pass&#xff1a;Basic 认证的密码。

val conf &#61; new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only&#xff0c;由于在云服务器环境中&#xff0c;配置文件使用的一般为内网地址&#xff0c;而本地调试的时候一般使用外网地址&#xff0c;这样将 es.nodes 配置为外网地址后&#xff0c;最后会出现节点找不到的问题&#xff08;由于会使用节点配置的内网地址去进行连接&#xff09;&#xff1a;

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available;
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名&#xff0c;集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复&#xff0c;写入 ES 时往往会出现数据写入冲突的错误&#xff0c;此时有两种解决方法。

方法一&#xff1a;设置 es.write.operation 为 upsert&#xff0c;这样达到的效果为如果存在则更新&#xff0c;不存在则进行插入&#xff0c;该配置项默认值为 index。

方法二&#xff1a;自定义冲突处理类&#xff0c;类似上述配置中设置了自定义的 error.handlers&#xff0c;通过自定义类来处理相关错误&#xff0c;例如忽略冲突等&#xff1a;

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {if (entry.getResponseCode() &#61;&#61; 409) {StaticLog.warn("Encountered conflict response. Ignoring old data.");return HandlerResult.HANDLED;}return collector.pass("Not a conflict response code.");}
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入&#xff1a;

  • saveToEs &#xff1a;RDD 内容为 Seq[Map]&#xff0c;即一个 Map 对象集合&#xff0c;每个 Map 对应一个文档&#xff1b;
  • saveJsonToEs&#xff1a;RDD 内容为 Seq[String]&#xff0c;即一个 String 集合&#xff0c;每个 String 是一个 JSON 字符串&#xff0c;代表一条记录&#xff08;对应 ES 的 _source&#xff09;。

数据写入可以指定很多配置信息&#xff0c;例如&#xff1a;

  • es.resource&#xff1a;设置写入的索引和类型&#xff0c;索引和类型名均支持动态变量&#xff1b;
  • es.mapping.id&#xff1a;设置文档 _id 对应的字段名&#xff1b;
  • es.mapping.exclude&#xff1a;设置写入时忽略的字段&#xff0c;支持通配符。

val itemRdd &#61; rdd.flatMap(line &#61;> {val topic &#61; line.topic()println("正在处理&#xff1a;" &#43; topic &#43; " - " &#43; line.partition() &#43; " : " &#43; line.offset())val jsonArray &#61; JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScalaval resultMap &#61; jsonArray.map(jsonObj &#61;>{var tmpId &#61; "xxx"var tmpIndex &#61; "xxxxxx"jsonObj.put("myTmpId", tmpId)jsonObj.put("myTmpIndex", tmpIndex)jsonObj.getInnerMap})resultMap
})
val mapConf &#61; Map(("es.resource" , "{myTmpIndex}/doc"),("es.write.operation" , "upsert"),("es.mapping.id" , "myTmpId"),("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合&#xff08;saveToEs&#xff09;&#xff0c;当为 Json 字符串集合时&#xff08;saveJsonToEs&#xff09;会提示不支持的错误信息&#xff1b;这个配置项非常有用&#xff0c;例如 myTmpId 作为文档 id&#xff0c;因此没有必要重复存储到 _source 里面了&#xff0c;可以配置到这个配置项&#xff0c;将其从 _source 中排除。


Any Code&#xff0c;Code Any&#xff01;

扫码关注『AnyCode』&#xff0c;编程路上&#xff0c;一起前行。




推荐阅读
  • python zookeeeper 学习和操作
    1.zookeeeper介绍ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的 ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • 基于,docker,快速,部署,多,需求,spark ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • Jmeter对RabbitMQ压力测试
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Jmeter对RabbitMQ压力测试相关的知识,希望对你有一定的参考价值。Jm ... [详细]
  • ZooKeeper 学习
    前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Kylin 单节点安装
    软件环境Hadoop:2.7,3.1(sincev2.5)Hive:0.13-1.2.1HBase:1.1,2.0(sincev2.5)Spark(optional)2.3.0K ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 本文_大数据之非常详细Sqoop安装和基本操作
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据之非常详细Sqoop安装和基本操作相关的知识,希望对你有一定的参考价值。大数据大数据之 ... [详细]
author-avatar
skychen88232011
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有