1.RDD创建sparkcore从文件读取linessc.textFile(file:userdata_path)#localScalavallinessc.textF
1. RDD创建
spark core
从文件读取
>>> lines = sc.textFile("file:///user/data_path") # local
Scala> val lines = sc.textFile("hdfs://localhost:9000/user/data_path") # from hdfs
>>> lines.foreach(print)
通过并行集合创建
array = [1,2,3,4,5]
rdd = sc.parallelize(array)
rdd.foreach(print)
2. RDD 操作
转换操作
filter(),map(),flatMap(),groupByKey(),reduceByKey(func)
每次转换操作生成新的RDD惰性机制
1. filter()
linesWithSpark = lines.filter(lambda line:"Spark" in line)
2.map()
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:x+10)
rdd2.foreach(print)
3.flatMap()
# RDD(lines) -> RDD(word array) -> RDD(words)
words = lines.flatMap(lambda line:lines.split(" "))
4.groupByKey
words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.groupByKey()
words1.foreach(print)
# result:
# ('Hadoop',)
# ('is',)
5.reduceByKey
words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.reduceByKey(lambda a,b:a+b)
words1.foreach(print)
# result:
# ('Hadoop',1)
# ('is',2)
首先执行了groupByKey操作,然后运用reduce函数进行了计算
行动操作
count(),collect(),first(),take(n),reduce(func),foreach(func)
rdd = sc.parallelize([1,2,3,4,5])
rdd.count()
# 5
rdd.first()
# 1
rdd.take(3)
# [1,2,3]
rdd.reduce(lambda a,b:a+b)
# 15
rdd.collect()
# [1,2,3,4,5]
rdd.foreach(lambda elem:print(elem))
# 1
# 2
# 3
# 4
# 5
3. 持久化
persist()对一个RDD标记为持久化在行动操作以后实现真正持久化保留在缓存中之后还可以继续利用
.persist(MEMORY_ONLY/MEMORY_AND_DISK ) == RDD.cache()
.unpersist 从内存里删除
list = ['Hadoop','spark']
rdd = sc.parallelize(list)
rdd.cache() # 暂时并不会缓存,这时候rdd还没有被计算生成
print(rdd.count()) # 第一次行动操作,触发一次真正从头到尾的计算,这时候rdd.cahche()才会被实行,把这个rdd放到缓存里
# 2
print(','.join(rdd.collect ())) # 第二次行动操作,不需要从头到尾的计算,只需要重复使用上面缓存中的rdd
# Hadoop,spark
4. RDD 分区
分区的好处增加并行度 2.减少通信开销(表连接消耗减少)
分区个数 =集群种CPU的个数
Spark.default.parallelism
list = [1,2,3,4,5]
rdd = sc.parallelize(list,2)
#也可以用repartition重新分区
自定义分区方法
from pyspark import SparkConf,SparkContext
def MyPartitioner(key):
return %10
def main():
cOnf= SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(cOnf= conf)
data = sc.parallelize(range(10),5)
data.map(lambda x:(x,1))\
.partitionBy(10,MyPartitioner)\
.map(lambda x:x[0])\
.saveAsTextFile("file:///user/rdd")
if __name__ == '__main__':
main()
运行
$ cd /user/rdd
$ python3 TestPartitioner.py
5. 键值RDD创建
(key,vaule)形式:
读文件/并行集合等方法来建立RDD
lines = sc.textFile('files:///user/local/spark/word.txt')
pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
pairRDD.foreach(print)
# ('I',1)
# ('Hadoop',1)
常用RDD转换操作
- groupByKey vs. reduceByKey
groupByKey对每个key进行操作,只生成一个seq,本身不能定义函数,需要先用groupByKey生成RDD,然后通过map进行自定义函数操作; reduceByKey对于每个key对应的多个value进行merge操作,可以在本地进行merge操作,并且merge操作可以通过函数自定义。
# using reduceByKey
words = ['one','two','two']
wordPairRDD = sc.parallelize(words).map(lambda word:(word,1))
wordCountsWithReduce = wordPairRDD.reduceByKey(lambda a,b:a+b)
wordCountsWithReduce.foreach(print)
# ('one',1)
# ('two',2)
# using groupByKey
wordCountsWithGroup = wordPairRDD.groupByKey().map(lambda t:(t[0],sum(t[1])))
wordCountsWithGroup.foreach(print)
# ('one',1)
# ('two',2)
- keys:把pairRDD中的key返回形成一个新的RDD
- value:把pairRDD中的value返回形成一个新的RDD
- sortByKey():排序
- mapValues(func):只对value进行func操作
- join:join by key
6. 数据读写
一. 本地文件系统/分布式文件系统HDFS
# 本地
textFile = sc.textFile("files:///user/local/word.txt")
textFile.first()
saveAsTextFile("file:///user/rdd/writeback")
# HDFS
textFile = sc.textFile("hdfs://user/hadoop/word.txt")
textFile = sc.textFile("/user/hadoop/word.txt")
textFile = sc.textFile("word.txt")
saveAsTextFile("writeback")
二. HBase 读写
HBsase简介
HBase 是一种列式的分布式数据库,是由当年的 Google 公布的 BigTable 的论文而生。不过这里也要注意 HBase 底层依旧依赖 HDFS 来作为其物理存储。
关系数据库:按行存储;而HBase按单元格保存。
元素:
- 表: 表由行与列组成
- 行: 每个行有row key唯一标识
- 列族: 每个列族有多个列
- 列限定符:列族里的数据通过列限定符来定位
- 时间戳:每个单元格都保存则同一份数据的多个版本,采用时间戳进行索引
单元格:行/列族/列限定符 确定一个单元格,数据没有数据类型,字节数byte
四维定位:行+列族+列限定符+时间戳 -> 数据
存储原理:把表格分割成多个单元格在不同电脑上存储
创建一个HBase表
# 启动hadoop
$cd /user/local/hadoop
$./sbin/start-all.sh
#启动HBase
$cd user/local/hbase
$./bin/start-hbase.sh //启动HBase
$./bin/hbase shell //启动hbase shell
创建表
hbase> create 'student','info'
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
# put '表名称', 'row key','column name','value'