支持的格式
- 文件系统:比如NFS, HDFS, S3, TEXT, JSON等
- 使用Spark SQL处理结构化数据:比如Json,APACHE HIVE等
- 键值对的数据库:比如CASSANDRA, HBASE, ELASTICSEARCH, JDBC等
文件系统
下面是一些常见的,在spark中使用的文件系统:
Text Files
加载文件只需要调用textFile()这个函数即可。
d = sc.textFile('README.md')
r = d.filter(lambda line: "Python" in line)
print(r.first())
保存数据到文件也只需要调用这个方法saveAsTextFile()即可。
r.saveAsTextFile('tt.txt')
Json
导入json文件
import json
d = sc.textFile('json_demo.txt')
data = d.map(lambda x: json.loads(x))
def f(x):
print(x['teacher_id'])
data.foreach(f)
导出json文件
import json
d = sc.textFile('json_demo.txt')
data = d.map(lambda x: json.loads(x))
data.filter(lambda x: x['grade_id'] == 8).map(lambda x: json.dumps(x))\
.saveAsTextFile('b.json')
CSV文件
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=['name', 'other'])
return reader.next()
input = sc.textFile('csvfile').map(loadRecord)
SequenceFiles
读取SequenceFiles
SequenceFiles是Hadoop的key/value类型的文件格式。Spark有独特的API来读取SequenceFile,我们可以在SparkContext上面调用sequenceFile(path, keyClass, valueClass, minPartitions)。
val data = sc.sequenceFile(inFile, “org.apache.hadoop.io.Text”, “org.apache.hadoop.io.IntWritable”)
保存SequenceFiles
因为SequenceFiles是key/value对,所以我们需要使用 PairRDD 类型来写入。
data = sc.parallelize({("panda", 3), ("snail", 2)})
data.saveAsSequenceFile('sqe.hdfs')
Object Files
Object File与SequenceFile有一点区别,就是Object File保存在RDD里面只有values。
读取object文件调用 pickleFile(),写入调用saveAsPickleFile(),它们使用python的pickle serialization库完成的。
文件压缩
在处理大数据的时候,我们经常需要对数据进行压缩以便减少网络使用量和节约存储空间。我们已经知道Spark函数(textFile和sequenceFile)可以自动的帮我们处理一些压缩文件。
选择一个压缩的格式会对将来使用数据的用户有非常大的影响。因为分布式系统例如spark,它经常会从不同的机器读取数据,为了使worker能够这样工作是因为我们需要让worker知道哪里是新数据的开始。但是一些压缩格式就不支持这种功能,所以它就需要我们的single node读取所有的数据来找到开始的地方。允许能够从多个机器读取数据的编码格式我们称之为“splittable”。
注意:textFile()可以处理compressed input,但是它会自动的禁止splittable及时文件的压缩形式是支持splittable的。
一些input format例如(SequenceFiles)允许我们仅仅对value进行压缩,这样可以方便我们通过key对数据进行查找。
介绍SPARK支持的文件系统 Filesystems
Spark支持许多的可以写和读的文件系统。
Local/“Regular” FS
Spark支持从本地的文件系统读取数据,但是必须要求这个文件在所有集群机器的节点上都存在。一些网络文件系统例如(NFS, AFS和MapR NFS)等,都可以直接使用file://path;这种方式把数据传给SPARK,前提要求是只要你的文件系统已经挂载到了每个节点的相同path下面。
如果你的文件并没有分配在所有集群的节点下面,那么最好的办法是现在本地的加载然后通过 parallelize来分发这些内容给worker。这种方法是比较慢的,所以我们推荐把文件放在共享文件系统例如HDFS, NFS和S3。
Amazon S3
建议如果使用Amazon S3,则服务器最好使用Amazon EC2。
HDFS
Hadoop文件系统与Spark可以很好的结合起来,他们可以在同一台机器上搭建服务,使用Hadoop的文件只需要指定 hdfs://master:port/path 即可。
Spark Sql处理结构化数据
所谓的结构化数据,就是存在schema的数据,Spark Sql支持各种结构化的数据,它能够有效率的读取数据源中的字段。
我们输入一个SPARK SQL是一个SQL QUERY,可以在数据源中运行,然后会将读取到的数据作为一个row objects RDD返回。
Apache Hive
一个非常常见的结构化数据源是Apache Hive,Hive能够以各种各样的形式保存表,从纯文本到面向列的格式,在HDFS里面或者不同的存储系统。
通过SPARK SQL连接到一个已经存在的HIVE,我们需要提供HIVE的配置。比如复制hive-site.xml到Spark的conf目录下面。一旦你完成这个,你就可以创建 HiveContext object。然后通过HiveContext可以编写HIVE QUERY LANGUAGE(HQL) query来读取表数据。
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql(“SELECT name, age FROM users”)
firstRow = rows.first()
print firstRow.name
介绍SPARK支持的数据库 Database
这里我们介绍4个最常见和使用的connector。
Java Database Connectivity
JDBC包括Mysql,Postgres和其他的系统。为了访问它们的数据,我们需要在SparkContext上面使用 org.apache.spark.rdd.JdbcRDD。
Cassandra
Spark与Cassandra是结合使用非常好的一个组合。因为Cassandra不属于Spark,所以我们需要添加依赖来使用Cassandra。Cassandra不使用Spark Sql,但是它返回的CassandraRow objects与Spark Sql的 Row objects差不多。
与Elasticsearch类似,Cassandra connector读取属性来决定连接到哪个cluster。我们设置 spark.cassandra.connection.host 来指定Cassandra cluster,spark.cassandra.auth.username 和spark.cassandra.auth.password 来指定用户名和密码。
HBase
Spark可以通过Hadoop input格式来访问HBase。
Elasticsearch
Spark可以直接对Elasticsearch进行读写。因为Elasticsearch是做的mapping inference,所以建议显示的对非字符串的数据设置mapping。