热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark,Parquet和S3–非常复杂

(这篇文章的版本最初发布在AppsFlyer的博客中。还要特别感谢来自AppsFlyer数据团队的MorriFeldman和MichaelSpector,

(这篇文章的版本最初发布在AppsFlyer的博客中 。还要特别感谢来自AppsFlyer数据团队的Morri Feldman和Michael Spector ,他们做了大部分工作来解决本文中讨论的问题)

TL; DR; Spark,Parquet和S3(&Mesos)的结合是一个功能强大,灵活且具有成本效益的分析平台(并且是Hadoop的替代产品)。 但是,要使所有这些技术融为一体并完美地发挥作用并非易事。 这篇文章描述了我们(AppsFlyer)在基于这些技术构建分析平台时面临的挑战,以及为缓解这些问题并使它们全部起作用而采取的步骤。

Spark之所以成为Map / Reduce的主要替代产品,有几个原因,其中包括不同Hadoop发行版的广泛采用,将批处理和流式传输结合在一个平台上以及不断增长的机器学习集成库(无论是在包含的算法方面)以及与机器学习语言(即R和Python)的集成。 在AppsFlyer,我们已经将Spark用作ETL(提取,转换和加载)和分析的主要框架已有一段时间了。 最近的一个例子是我们最近发布的保留报告的新版本 ,该报告利用Spark通过ETL(主要是数据清理)和分析(迈向全面点击欺诈检测的垫脚石)处理了多个数据流(每天大于1TB)。生成报告。

一对

我们在此报告中介绍的主要更改之一是从基于Sequence文件构建到使用Parquet文件的转变。 Parquet是一种柱状数据格式,它可能是当今用于存储长期大数据以进行分析的最佳选择(除非您对Hive进行了大量投资,而Orc是更合适的格式)。 Parquet vs. Sequence文件的优势是性能和压缩,而又不会失去大数据工具(Spark,Hive,Drill,Tajo,Presto等)的广泛支持。

大数据基础架构的一个相对独特的方面是,我们不使用Hadoop(也许这是另一篇文章的主题)。 我们使用Mesos代替YARN作为资源管理器,并且使用Amazon S3代替HDFS作为分布式存储解决方案。 HDFS相对于S3具有几个优点,但是,与使用S3相比,在AWS上维护长时间运行的HDFS集群的成本/收益不胜枚举。

就是说,Spark,Parquet和S3的组合给我们带来了一些挑战,这篇文章将列出主要挑战以及我们想出的解决方案。

实木复合地板和火花

Parquet和Spark似乎已经处于爱恨交加的状态了一段时间了。 一方面,Spark文档将Parquet吹捧为大数据分析的最佳格式之一(事实如此),另一方面,Spark中对Parquet的支持不完整且令人讨厌使用。 事情肯定会朝着正确的方向发展,但仍有一些怪癖和陷阱需要提防。

首先,在过去几个月中,Spark和Parquet集成已经取得了很大的进步。 以前, 为了将现有数据转换为Parquet , 人们不得不跳过障碍 。 将DataFrames引入Spark使得此过程变得非常简单。 当DataFrame API支持输入格式时,例如,输入是JSON(内置)或Avro(尚未在Spark中内置,但您可以使用库来读取它),转换为Parquet只是一个问题一方面读取输入格式,另一方面将其保留为Parquet。 考虑一下Scala中的以下代码片段:

val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data = sqlContext.read.json(inputPath)
date.write.parquet(outputPath)

即使当您处理模式不属于数据的格式时,转换过程也非常简单,因为Spark允许您以编程方式指定模式。 Spark文档非常简单明了,并包含Scala,Java和Python中的示例 。 此外,用其他语言定义架构也不太复杂。 例如,在这里(AppsFlyer),我们使用Clojure作为我们的主要开发语言,因此我们开发了几个辅助函数来实现此目的。 下面的示例代码提供了详细信息:

第一件事是从我们拥有的任何结构中提取数据并指定我们喜欢的模式。 下面的代码获取一个事件记录,并将事件记录中的各种数据点提取为[:column_name value optional_data_type]形式的向量。 请注意,数据类型是可选的,因为如果未指定,则默认为字符串。

(defn record-builder[event-record](let [..raw-device-params (extract event-record "raw_device_params")result [...[:operator (get raw-device-params "operator")][:model (get raw-device-params "model")]...[:launch_counter counter DataTypes/LongType]]]result))

下一步是使用上述结构提取模式并转换为DataFrame行:

(defn extract-dataframe-schema[rec](let [fields (reduce (fn [lst schema-line](let [k (first schema-line)t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ](conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)arr (ArrayList. fields)](DataTypes/createStructType arr)))(defn as-rows[rec](let [values (object-array (reduce (fn [lst v] (conj lst v)) [] rec))](RowFactory/create values)))

最后,我们将这些功能应用于RDD上,将其转换为数据帧并另存为镶木地板:

(let [..schema (trans/extract-dataframe-schema (record-builder nil))..rdd (spark/map record-builder some-rdd-we-have)rows (spark/map trans/as-rows rdd)dataframe (spark/create-data-frame sql-context rows schema)]
(spark/save-parquert dataframe output-path :overwrite))

如上所述,Parquet和Spark的事情还在不断发展,但路途还不明确。 我们遇到的一些问题包括:

  • 1.4版本中的一个严重错误,其中在编写镶木地板文件时出现竞争状况会导致大量工作数据丢失(此错误已在1.4.1中修复-因此,如果您昨天使用Spark 1.4和镶木地板升级!)
  • 过滤器下推式优化,由于Spark仍然使用Parquet 1.6.0rc3,因此默认情况下处于关闭状态-尽管1.6.0已经推出了一段时间(似乎Spark 1.5将使用Parquet 1.7.0,所以可以解决问题)
  • Spark并不“本地”支持Parquet,相反,Spark依赖Hadoop对Parquet格式的支持-这本身不是问题,但是对我们来说,当我们尝试将Spark和Parquet与S3一起使用时,这引起了主要的性能问题-更多在下一节中

镶木地板,Spark和S3

Amazon S3(简单存储服务)是一种使用成本相对较低的对象存储解决方案。 与“真实”文件系统相比,它确实有一些缺点。 最主要的是最终的一致性,即一个过程所做的更改对其他应用程序而言不会立即可见。 (如果您使用的是Amazon的EMR,则可以使用EMRFS“一致视图”来克服此问题。)但是,如果您了解此限制,则S3仍然是可行的输入和输出源,至少对于批处理作业而言。

如上所述,Spark没有本地S3实现,它依赖Hadoop类来抽象对Parquet的数据访问。 Hadoop为S3提供了3个文件系统客户端:

  • S3块文件系统(格式为“ s3:// ..”的URI模式) 这似乎不适用于Spark
  • S3本机文件系统(“ s3n:// ..” URI)–下载支持Hadoop 2. *及更高版本的Spark发行版(如果要使用,请使用tl; dr –不使用)
  • S3a – S3n的替代品,消除了S3n的某些限制和问题。 下载“使用Hadoop 2.6及更高版本的Spark”支持以使用此功能(tl; dr –您需要此功能,但在使用前需要做一些工作)

当使用Spark 1.3时,尝试使用S3时遇到了许多问题,因此我们开始使用s3n –它在大多数情况下都有效,例如,我们可以运行并完成作业,但是其中许多作业因各种读取超时而失败,并且主机未知例外。 观察工作中的任务,失败的比例甚至更高,使我们将超时和重试次数增加到荒谬的水平。 当我们转到Spark 1.4.1时,我们尝试了s3a时遇到了另一次麻烦。 这次我们开始工作了。 我们要做的第一件事是将spark.executor.extraClassPath和spark.executor.extraDriverPath都设置为指向aws-java-sdk和hadoop-aws jars,因为“ Spark with Hadoop 2.6”显然都缺失了它们。建立 。 自然地,我们使用了这些文件的2.6版本,但后来遇到了这个小问题 。 Hadoop 2.6 AWS实施存在一个错误,该错误导致它以意想不到的方式拆分S3文件(例如,一个400个文件的作业运行了1800万个任务)幸运的是,使用Hadoop AWS jar版本2.7.0而不是2.6 版本解决了此问题–因此,设置了所有s3a前缀后,它们就可以正常工作(并且比s3n具有更好的性能)。

找到合适的S3 Hadoop库有助于提高工作的稳定性,但是无论使用S3库(s3n还是s3a),使用Parquet文件的Spark作业的性能仍然很差。 在查看Spark UI时,处理数据的实际工作似乎很合理,但是Spark在实际开始工作之前以及工作“完成”之后实际终止之前花费了大量时间。 我们喜欢称这种现象为“木地板税”。

显然,我们不能忍受“木地板税”,因此我们深入研究了工作日志并发现了一些问题。 第一个与Parquet作业的启动时间有关。 生成Spark的人知道架构可以随着时间的推移而发展,并为DataFrames提供了一个很好的功能,称为“模式合并”。 如果您查看大数据湖/水库中的架构(或今天所说的名称),则可以肯定会随着时间的推移而演进。 但是,如果您查看的是单个作业的结果目录,则架构没有任何区别……事实证明,当Spark初始化作业时,它将读取所有Parquet文件的页脚以执行架构合并。 所有这些工作都是在将任何任务分配给执行程序之前由驱动程序完成的,并且可能要花费很长时间甚至数小时(例如,我们的工作需要回顾半年的安装数据)。 它没有记录,但是查看Spark代码,您可以通过将mergeSchema指定为false来覆盖此行为:

在Scala中:

val file = sqx.read.option("mergeSchema", "false").parquet(path)

在Clojure中:

(-> ^SQLContext sqtx(.read)(.format "parquet")(.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))(.load))

请注意,这在Spark 1.3中不起作用。 在Spark 1.4中,它按预期工作,在Spark 1.4.1中,它使Spark只查看_common_metadata文件,这不是世界末日,因为它是一个小文件,并且每个目录中只有一个。 但是,这使我们进入了“木地板税”的另一个方面,即“工作终止”的延迟。

关闭架构合并和控制Spark使用的架构有助于减少作业启动时间&#xff0c;但是&#xff0c;如上所述&#xff0c;在作业结束时&#xff0c;我们仍然遭受了长时间的延迟。 使用文本文件时&#xff0c;我们已经知道一个与Hadoop <-> S3相关的问题。 Hadoop是不可变的&#xff0c;首先将文件写入临时目录&#xff0c;然后将其复制。 使用S3&#xff0c;这不是问题&#xff0c;但是复制操作非常昂贵。 通过文本文件&#xff0c;DataBricks创建了DirectOutputCommitter &#xff08;可能是针对他们的Spark SaaS产品&#xff09;。 替换文本文件的输出提交者非常简单–您只需在Spark配置上设置“ spark.hadoop.mapred.output.committer.class”即可&#xff0c;例如&#xff1a;

(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")

Parquet存在类似的解决方案&#xff0c;与文本文件解决方案不同&#xff0c;它甚至是Spark发行版的一部分。 但是&#xff0c;要使事情复杂化&#xff0c;您必须在Hadoop配置而不是Spark配置上进行配置。 要获取Hadoop配置&#xff0c;您首先需要从Spark配置创建一个Spark上下文&#xff0c;在其上调用hadoopConfiguration&#xff0c;然后按如下所示设置“ spark.sql.parquet.output.committer.class”&#xff1a;

(let [ctx (spark/spark-context conf)hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)](.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))

使用DirectParquetOutputCommitter可以大大降低“木地板税”&#xff0c;但我们仍然发现某些工作需要很长时间才能完成。 问题再次出在罪魁祸首是文件系统假设Spark和Hadoop。 请记住&#xff0c;“ _ common_metadata” Spark着眼于一项工作的开始–嗯&#xff0c;Spark在工作结束时花费了大量时间来创建此文件和一个附加的MetaData文件&#xff0c;并使用目录中文件的附加信息。 同样&#xff0c;这都是从一个地方&#xff08;驱动程序&#xff09;完成的&#xff0c;而不是由执行者处理的。 当作业产生小的文件时&#xff08;即使有成千上万的文件&#xff09;&#xff0c;该过程将花费合理的时间。 但是&#xff0c;当作业生成较大的文件时&#xff08;例如&#xff0c;当我们摄取一整天的应用程序启动时&#xff09;&#xff0c;此过程可能需要一个小时以上。 与mergeSchema一样&#xff0c;解决方案是手动管理元数据&#xff0c;因此我们将“ parquet.enable.summary-metadata”设置为false&#xff08;同样在Hadoop配置上&#xff0c;并自行生成_common_metadata文件&#xff08;用于大型作业&#xff09;

综上所述&#xff0c;Parquet尤其是Spark正在开发中-使尖端技术为您服务可能是一个挑战&#xff0c;需要大量挖掘。 该文档有时远非完美&#xff0c;但是幸运的是&#xff0c;所有相关技术都是开源的&#xff08;甚至是Amazon SDK &#xff09;&#xff0c; 因此您始终可以深入研究错误报告&#xff0c;代码等 。 了解事情的实际运作方式并找到所需的解决方案。 此外&#xff0c;您有时会发现文章和博客文章&#xff0c;这些文章和博客文章解释了如何克服所用技术中的常见问题。 我希望这篇文章能消除集成Spark&#xff0c;Parquet和S3的一些复杂问题&#xff0c;这些复杂问题最终都是具有巨大潜力的所有出色技术。

翻译自: https://www.javacodegeeks.com/2015/08/spark-parquet-and-s3-its-complicated.html




推荐阅读
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 程度|也就是_论文精读:Neural Architecture Search without Training
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了论文精读:NeuralArchitectureSearchwithoutTraining相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 当计算任务越来越多,作业提交越来越多,企业普通的做法是,在原有的系统架构上,不停地往上堆积硬件或者加服务器。的确,hadoop设计上的优秀和可扩展性可以方便的 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • Windows下配置PHP5.6的方法及注意事项
    本文介绍了在Windows系统下配置PHP5.6的步骤及注意事项,包括下载PHP5.6、解压并配置IIS、添加模块映射、测试等。同时提供了一些常见问题的解决方法,如下载缺失的msvcr110.dll文件等。通过本文的指导,读者可以轻松地在Windows系统下配置PHP5.6,并解决一些常见的配置问题。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 基于事件驱动的并发编程及其消息通信机制的同步与异步、阻塞与非阻塞、IO模型的分类
    本文介绍了基于事件驱动的并发编程中的消息通信机制,包括同步和异步的概念及其区别,阻塞和非阻塞的状态,以及IO模型的分类。同步阻塞IO、同步非阻塞IO、异步阻塞IO和异步非阻塞IO等不同的IO模型被详细解释。这些概念和模型对于理解并发编程中的消息通信和IO操作具有重要意义。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • 我们先从我们最常用的事件处理开始,Rxjs对事件的处理有一个其实现的效果如下图: ... [详细]
  • 开发笔记:大三上寒假15天第5天
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了大三上寒假15天--第5天相关的知识,希望对你有一定的参考价值。昨天的下载完成后运行报错,应该是下载的spark版本和教 ... [详细]
  • 荐
                                                        应用程序负载均衡器概述
    关注公众号:AWS爱好者(iloveaws)文|沉默恶魔(禁止转载,转载请先经过作者同意)网站:www.iloveaws.cnHello大家好,欢迎来到《AWS解决方案架构师 ... [详细]
author-avatar
茴忆是抹不掉的伤
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有