热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark加载和保存数据

支持的格式文件系统:比如NFS,HDFS,S3,TEXT,JSON等使用SparkSQL处理结构化数据:比如Json,APACHEHIVE等键值对的数据库:比如

支持的格式
  • 文件系统:比如NFS, HDFS, S3, TEXT, JSON等
  • 使用Spark SQL处理结构化数据:比如Json,APACHE HIVE等
  • 键值对的数据库:比如CASSANDRA, HBASE, ELASTICSEARCH, JDBC等

文件系统

下面是一些常见的,在spark中使用的文件系统:
这里写图片描述

Text Files

加载文件只需要调用textFile()这个函数即可。

d = sc.textFile('README.md')
r = d.filter(lambda line: "Python" in line)
print(r.first())

保存数据到文件也只需要调用这个方法saveAsTextFile()即可。

r.saveAsTextFile('tt.txt')

Json

导入json文件

import json
d = sc.textFile('json_demo.txt')
data = d.map(lambda x: json.loads(x))

def f(x):
print(x['teacher_id'])
data.foreach(f)

导出json文件

import json
d = sc.textFile('json_demo.txt')
data = d.map(lambda x: json.loads(x))
data.filter(lambda x: x['grade_id'] == 8).map(lambda x: json.dumps(x))\
.saveAsTextFile('b.json')

CSV文件

import csv
import StringIO

def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=['name', 'other'])
return reader.next()
input = sc.textFile('csvfile').map(loadRecord)

SequenceFiles

读取SequenceFiles

SequenceFiles是Hadoop的key/value类型的文件格式。Spark有独特的API来读取SequenceFile,我们可以在SparkContext上面调用sequenceFile(path, keyClass, valueClass, minPartitions)。

val data = sc.sequenceFile(inFile, “org.apache.hadoop.io.Text”, “org.apache.hadoop.io.IntWritable”)

保存SequenceFiles

因为SequenceFiles是key/value对,所以我们需要使用 PairRDD 类型来写入。

data = sc.parallelize({("panda", 3), ("snail", 2)})
data.saveAsSequenceFile('sqe.hdfs')

Object Files

Object File与SequenceFile有一点区别,就是Object File保存在RDD里面只有values。
读取object文件调用 pickleFile(),写入调用saveAsPickleFile(),它们使用python的pickle serialization库完成的。

文件压缩

在处理大数据的时候,我们经常需要对数据进行压缩以便减少网络使用量和节约存储空间。我们已经知道Spark函数(textFile和sequenceFile)可以自动的帮我们处理一些压缩文件。

选择一个压缩的格式会对将来使用数据的用户有非常大的影响。因为分布式系统例如spark,它经常会从不同的机器读取数据,为了使worker能够这样工作是因为我们需要让worker知道哪里是新数据的开始。但是一些压缩格式就不支持这种功能,所以它就需要我们的single node读取所有的数据来找到开始的地方。允许能够从多个机器读取数据的编码格式我们称之为“splittable”。
这里写图片描述

注意:textFile()可以处理compressed input,但是它会自动的禁止splittable及时文件的压缩形式是支持splittable的。

一些input format例如(SequenceFiles)允许我们仅仅对value进行压缩,这样可以方便我们通过key对数据进行查找。

介绍SPARK支持的文件系统 Filesystems

Spark支持许多的可以写和读的文件系统。

Local/“Regular” FS

Spark支持从本地的文件系统读取数据,但是必须要求这个文件在所有集群机器的节点上都存在。一些网络文件系统例如(NFS, AFS和MapR NFS)等,都可以直接使用file://path;这种方式把数据传给SPARK,前提要求是只要你的文件系统已经挂载到了每个节点的相同path下面。

如果你的文件并没有分配在所有集群的节点下面,那么最好的办法是现在本地的加载然后通过 parallelize来分发这些内容给worker。这种方法是比较慢的,所以我们推荐把文件放在共享文件系统例如HDFS, NFS和S3。

Amazon S3

建议如果使用Amazon S3,则服务器最好使用Amazon EC2。

HDFS

Hadoop文件系统与Spark可以很好的结合起来,他们可以在同一台机器上搭建服务,使用Hadoop的文件只需要指定 hdfs://master:port/path 即可。

Spark Sql处理结构化数据

所谓的结构化数据,就是存在schema的数据,Spark Sql支持各种结构化的数据,它能够有效率的读取数据源中的字段。

我们输入一个SPARK SQL是一个SQL QUERY,可以在数据源中运行,然后会将读取到的数据作为一个row objects RDD返回。

Apache Hive

一个非常常见的结构化数据源是Apache Hive,Hive能够以各种各样的形式保存表,从纯文本到面向列的格式,在HDFS里面或者不同的存储系统。

通过SPARK SQL连接到一个已经存在的HIVE,我们需要提供HIVE的配置。比如复制hive-site.xml到Spark的conf目录下面。一旦你完成这个,你就可以创建 HiveContext object。然后通过HiveContext可以编写HIVE QUERY LANGUAGE(HQL) query来读取表数据。

from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql(“SELECT name, age FROM users”)
firstRow = rows.first()
print firstRow.name

介绍SPARK支持的数据库 Database

这里我们介绍4个最常见和使用的connector。

Java Database Connectivity

JDBC包括Mysql,Postgres和其他的系统。为了访问它们的数据,我们需要在SparkContext上面使用 org.apache.spark.rdd.JdbcRDD。

Cassandra

Spark与Cassandra是结合使用非常好的一个组合。因为Cassandra不属于Spark,所以我们需要添加依赖来使用Cassandra。Cassandra不使用Spark Sql,但是它返回的CassandraRow objects与Spark Sql的 Row objects差不多。

与Elasticsearch类似,Cassandra connector读取属性来决定连接到哪个cluster。我们设置 spark.cassandra.connection.host 来指定Cassandra cluster,spark.cassandra.auth.username 和spark.cassandra.auth.password 来指定用户名和密码。

HBase

Spark可以通过Hadoop input格式来访问HBase。

Elasticsearch

Spark可以直接对Elasticsearch进行读写。因为Elasticsearch是做的mapping inference,所以建议显示的对非字符串的数据设置mapping。


推荐阅读
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文详细介绍了 Spark 中的弹性分布式数据集(RDD)及其常见的操作方法,包括 union、intersection、cartesian、subtract、join、cogroup 等转换操作,以及 count、collect、reduce、take、foreach、first、saveAsTextFile 等行动操作。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 本文详细介绍了如何将Spring框架与Hibernate ORM框架进行集成,包括配置文件的设置和数据持久化操作的实现。 ... [详细]
  • 本文介绍了如何查看PHP网站及其源码的方法,包括环境搭建、本地测试、源码查看和在线查找等步骤。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 优化后的标题:Apache Cassandra数据写入操作详解
    本文详细解析了 Apache Cassandra 中的数据写入操作,重点介绍了 INSERT 命令的使用方法。该命令主要用于将数据插入到指定表的列中,其基本语法为 `INSERT INTO 表名 (列1, 列2, ...) VALUES (值1, 值2, ...)`。通过具体的示例和应用场景,文章深入探讨了如何高效地执行数据写入操作,以提升系统的性能和可靠性。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 在PHP中如何正确调用JavaScript变量及定义PHP变量的方法详解 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
author-avatar
爱情只有确定键没有取消键_874
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有