热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark学习笔记(RDD编程基础)

1.RDD创建sparkcore从文件读取linessc.textFile(file:userdata_path)#localScalavallinessc.textF

1. RDD创建 

spark core

从文件读取 

>>> lines = sc.textFile("file:///user/data_path") # local
Scala> val lines = sc.textFile("hdfs://localhost:9000/user/data_path") # from hdfs
>>> lines.foreach(print)

通过并行集合创建

array = [1,2,3,4,5]
rdd = sc.parallelize(array)
rdd.foreach(print)

2. RDD 操作

转换操作 

filter(),map(),flatMap(),groupByKey(),reduceByKey(func)

每次转换操作生成新的RDD惰性机制

《Spark学习笔记(RDD编程基础)》
《Spark学习笔记(RDD编程基础)》

1. filter()

linesWithSpark = lines.filter(lambda line:"Spark" in line)

2.map()

data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:x+10)
rdd2.foreach(print)

3.flatMap()

# RDD(lines) -> RDD(word array) -> RDD(words)
words = lines.flatMap(lambda line:lines.split(" "))

4.groupByKey

words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.groupByKey()
words1.foreach(print)
# result:
# ('Hadoop',)
# ('is',)

5.reduceByKey

words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.reduceByKey(lambda a,b:a+b)
words1.foreach(print)
# result:
# ('Hadoop',1)
# ('is',2)

首先执行了groupByKey操作,然后运用reduce函数进行了计算

行动操作

count(),collect(),first(),take(n),reduce(func),foreach(func)

rdd = sc.parallelize([1,2,3,4,5])
rdd.count()
# 5
rdd.first()
# 1
rdd.take(3)
# [1,2,3]
rdd.reduce(lambda a,b:a+b)
# 15
rdd.collect()
# [1,2,3,4,5]
rdd.foreach(lambda elem:print(elem))
# 1
# 2
# 3
# 4
# 5

3. 持久化

persist()对一个RDD标记为持久化在行动操作以后实现真正持久化保留在缓存中之后还可以继续利用

.persist(MEMORY_ONLY/MEMORY_AND_DISK ) == RDD.cache()

.unpersist 从内存里删除

list = ['Hadoop','spark']
rdd = sc.parallelize(list)
rdd.cache() # 暂时并不会缓存,这时候rdd还没有被计算生成
print(rdd.count()) # 第一次行动操作,触发一次真正从头到尾的计算,这时候rdd.cahche()才会被实行,把这个rdd放到缓存里
# 2
print(','.join(rdd.collect ())) # 第二次行动操作,不需要从头到尾的计算,只需要重复使用上面缓存中的rdd
# Hadoop,spark

4. RDD 分区 

分区的好处增加并行度 2.减少通信开销(表连接消耗减少)

分区个数 =集群种CPU的个数

Spark.default.parallelism

list = [1,2,3,4,5]
rdd = sc.parallelize(list,2)
#也可以用repartition重新分区

自定义分区方法

from pyspark import SparkConf,SparkContext
def MyPartitioner(key):
return %10
def main():
cOnf= SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(cOnf= conf)
data = sc.parallelize(range(10),5)
data.map(lambda x:(x,1))\
.partitionBy(10,MyPartitioner)\
.map(lambda x:x[0])\
.saveAsTextFile("file:///user/rdd")
if __name__ == '__main__':
main()

运行

$ cd /user/rdd
$ python3 TestPartitioner.py

5. 键值RDD创建

(key,vaule)形式:

读文件/并行集合等方法来建立RDD

lines = sc.textFile('files:///user/local/spark/word.txt')
pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
pairRDD.foreach(print)
# ('I',1)
# ('Hadoop',1)

常用RDD转换操作

  • groupByKey vs. reduceByKey

groupByKey对每个key进行操作,只生成一个seq,本身不能定义函数,需要先用groupByKey生成RDD,然后通过map进行自定义函数操作; reduceByKey对于每个key对应的多个value进行merge操作,可以在本地进行merge操作,并且merge操作可以通过函数自定义。

# using reduceByKey
words = ['one','two','two']
wordPairRDD = sc.parallelize(words).map(lambda word:(word,1))
wordCountsWithReduce = wordPairRDD.reduceByKey(lambda a,b:a+b)
wordCountsWithReduce.foreach(print)
# ('one',1)
# ('two',2)
# using groupByKey
wordCountsWithGroup = wordPairRDD.groupByKey().map(lambda t:(t[0],sum(t[1])))
wordCountsWithGroup.foreach(print)
# ('one',1)
# ('two',2)

  • keys:把pairRDD中的key返回形成一个新的RDD
  • value:把pairRDD中的value返回形成一个新的RDD
  • sortByKey():排序
  • mapValues(func):只对value进行func操作
  • join:join by key

6. 数据读写

一. 本地文件系统/分布式文件系统HDFS

# 本地
textFile = sc.textFile("files:///user/local/word.txt")
textFile.first()
saveAsTextFile("file:///user/rdd/writeback")
# HDFS
textFile = sc.textFile("hdfs://user/hadoop/word.txt")
textFile = sc.textFile("/user/hadoop/word.txt")
textFile = sc.textFile("word.txt")
saveAsTextFile("writeback")

二. HBase 读写

HBsase简介

HBase 是一种列式的分布式数据库,是由当年的 Google 公布的 BigTable 的论文而生。不过这里也要注意 HBase 底层依旧依赖 HDFS 来作为其物理存储。

关系数据库:按行存储;而HBase按单元格保存。

元素:

  • 表: 表由行与列组成
  • 行: 每个行有row key唯一标识
  • 列族: 每个列族有多个列
  • 列限定符:列族里的数据通过列限定符来定位
  • 时间戳:每个单元格都保存则同一份数据的多个版本,采用时间戳进行索引

单元格:行/列族/列限定符 确定一个单元格,数据没有数据类型,字节数byte

四维定位:行+列族+列限定符+时间戳 -> 数据

存储原理:把表格分割成多个单元格在不同电脑上存储

创建一个HBase表

# 启动hadoop
$cd /user/local/hadoop
$./sbin/start-all.sh
#启动HBase
$cd user/local/hbase
$./bin/start-hbase.sh //启动HBase
$./bin/hbase shell //启动hbase shell

创建表

hbase> create 'student','info'
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
# put '表名称', 'row key','column name','value'


推荐阅读
  • Lodash中文文档(v3.10.1)–“Collection”要领TranslatedbyPeckZegOriginalDocs:Lodashv3.10.1Docs乞助翻译文档的 ... [详细]
  •   uni-app开发教程,uni-app实例教程  UNI-APP开发(仿饿)开发课程:进入学习  推荐(免费):uni-app开发教程  文章目录  简介,网 ... [详细]
  • 最近想用js做一个简单的计算器,不过网上的例子好像大部分都是直接从左到右挨个计算,就好像1+2*5,就会先计算1+2,再计算3*5,并没有实现运算符的优先级,这里找到了一种方法实现,来总结一下。不过这 ... [详细]
  • 编程语言是从哪蹦出来的——大型伦理寻根现场
    Hello,我是Alex007,一个热爱计算机编程和硬件设计的小白,为啥是007呢?因为叫Alex的人太多了,再加上每天007的生活,Alex007就诞生了。聊一聊编程到底是啥,怎 ... [详细]
  • 元类print(type(abc))print(type(True))print(type(100))print(type([1,2,3]))print(type({na ... [详细]
  • Mysql MySqlBulkLoader在.NET平台下的批量插入
    批量导入publicboolTranBatchImpo ... [详细]
  • [C#]007_My007App_显示文件夹里所有文件的名字
    添加一个BUTTON,ComboBox,FolderBrowserDialg1.Code:privatevoidbtnShowFile_Click(objectsender,Eve ... [详细]
  • Spark 贝叶斯分类算法
    一、贝叶斯定理数学基础我们都知道条件概率的数学公式形式为即B发生的条件下A发生的概率等于A和B同时发生的概率除以B发生的概率。根据此公式变换,得到贝叶斯公式:即贝叶斯定律是关于随机 ... [详细]
  • 每次用到v-charts我都一阵头疼,因为明明是相同的功能,但是我好像每次用到的解决方法都不一样??每次都是在api中各种查,各种尝试…直到做了个各种数据图形的需求,决定还是好好整 ... [详细]
  • 也就是|单打_.net Core 自我学习随笔——工作的核心:增删改查
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了.netCore自我学习随笔——工作的核心:增删改查相关的知识,希望对你有一定的参考价值。上次介绍了一下如何进行连接 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
author-avatar
溪边莎草
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有