热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark学习笔记(RDD编程基础)

1.RDD创建sparkcore从文件读取linessc.textFile(file:userdata_path)#localScalavallinessc.textF

1. RDD创建 

spark core

从文件读取 

>>> lines = sc.textFile("file:///user/data_path") # local
Scala> val lines = sc.textFile("hdfs://localhost:9000/user/data_path") # from hdfs
>>> lines.foreach(print)

通过并行集合创建

array = [1,2,3,4,5]
rdd = sc.parallelize(array)
rdd.foreach(print)

2. RDD 操作

转换操作 

filter(),map(),flatMap(),groupByKey(),reduceByKey(func)

每次转换操作生成新的RDD惰性机制

《Spark学习笔记(RDD编程基础)》
《Spark学习笔记(RDD编程基础)》

1. filter()

linesWithSpark = lines.filter(lambda line:"Spark" in line)

2.map()

data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:x+10)
rdd2.foreach(print)

3.flatMap()

# RDD(lines) -> RDD(word array) -> RDD(words)
words = lines.flatMap(lambda line:lines.split(" "))

4.groupByKey

words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.groupByKey()
words1.foreach(print)
# result:
# ('Hadoop',)
# ('is',)

5.reduceByKey

words=sc.parallelize([("Hadoop",1),("is",1),("is",1)])
words1=words.reduceByKey(lambda a,b:a+b)
words1.foreach(print)
# result:
# ('Hadoop',1)
# ('is',2)

首先执行了groupByKey操作,然后运用reduce函数进行了计算

行动操作

count(),collect(),first(),take(n),reduce(func),foreach(func)

rdd = sc.parallelize([1,2,3,4,5])
rdd.count()
# 5
rdd.first()
# 1
rdd.take(3)
# [1,2,3]
rdd.reduce(lambda a,b:a+b)
# 15
rdd.collect()
# [1,2,3,4,5]
rdd.foreach(lambda elem:print(elem))
# 1
# 2
# 3
# 4
# 5

3. 持久化

persist()对一个RDD标记为持久化在行动操作以后实现真正持久化保留在缓存中之后还可以继续利用

.persist(MEMORY_ONLY/MEMORY_AND_DISK ) == RDD.cache()

.unpersist 从内存里删除

list = ['Hadoop','spark']
rdd = sc.parallelize(list)
rdd.cache() # 暂时并不会缓存,这时候rdd还没有被计算生成
print(rdd.count()) # 第一次行动操作,触发一次真正从头到尾的计算,这时候rdd.cahche()才会被实行,把这个rdd放到缓存里
# 2
print(','.join(rdd.collect ())) # 第二次行动操作,不需要从头到尾的计算,只需要重复使用上面缓存中的rdd
# Hadoop,spark

4. RDD 分区 

分区的好处增加并行度 2.减少通信开销(表连接消耗减少)

分区个数 =集群种CPU的个数

Spark.default.parallelism

list = [1,2,3,4,5]
rdd = sc.parallelize(list,2)
#也可以用repartition重新分区

自定义分区方法

from pyspark import SparkConf,SparkContext
def MyPartitioner(key):
return %10
def main():
cOnf= SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(cOnf= conf)
data = sc.parallelize(range(10),5)
data.map(lambda x:(x,1))\
.partitionBy(10,MyPartitioner)\
.map(lambda x:x[0])\
.saveAsTextFile("file:///user/rdd")
if __name__ == '__main__':
main()

运行

$ cd /user/rdd
$ python3 TestPartitioner.py

5. 键值RDD创建

(key,vaule)形式:

读文件/并行集合等方法来建立RDD

lines = sc.textFile('files:///user/local/spark/word.txt')
pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
pairRDD.foreach(print)
# ('I',1)
# ('Hadoop',1)

常用RDD转换操作

  • groupByKey vs. reduceByKey

groupByKey对每个key进行操作,只生成一个seq,本身不能定义函数,需要先用groupByKey生成RDD,然后通过map进行自定义函数操作; reduceByKey对于每个key对应的多个value进行merge操作,可以在本地进行merge操作,并且merge操作可以通过函数自定义。

# using reduceByKey
words = ['one','two','two']
wordPairRDD = sc.parallelize(words).map(lambda word:(word,1))
wordCountsWithReduce = wordPairRDD.reduceByKey(lambda a,b:a+b)
wordCountsWithReduce.foreach(print)
# ('one',1)
# ('two',2)
# using groupByKey
wordCountsWithGroup = wordPairRDD.groupByKey().map(lambda t:(t[0],sum(t[1])))
wordCountsWithGroup.foreach(print)
# ('one',1)
# ('two',2)

  • keys:把pairRDD中的key返回形成一个新的RDD
  • value:把pairRDD中的value返回形成一个新的RDD
  • sortByKey():排序
  • mapValues(func):只对value进行func操作
  • join:join by key

6. 数据读写

一. 本地文件系统/分布式文件系统HDFS

# 本地
textFile = sc.textFile("files:///user/local/word.txt")
textFile.first()
saveAsTextFile("file:///user/rdd/writeback")
# HDFS
textFile = sc.textFile("hdfs://user/hadoop/word.txt")
textFile = sc.textFile("/user/hadoop/word.txt")
textFile = sc.textFile("word.txt")
saveAsTextFile("writeback")

二. HBase 读写

HBsase简介

HBase 是一种列式的分布式数据库,是由当年的 Google 公布的 BigTable 的论文而生。不过这里也要注意 HBase 底层依旧依赖 HDFS 来作为其物理存储。

关系数据库:按行存储;而HBase按单元格保存。

元素:

  • 表: 表由行与列组成
  • 行: 每个行有row key唯一标识
  • 列族: 每个列族有多个列
  • 列限定符:列族里的数据通过列限定符来定位
  • 时间戳:每个单元格都保存则同一份数据的多个版本,采用时间戳进行索引

单元格:行/列族/列限定符 确定一个单元格,数据没有数据类型,字节数byte

四维定位:行+列族+列限定符+时间戳 -> 数据

存储原理:把表格分割成多个单元格在不同电脑上存储

创建一个HBase表

# 启动hadoop
$cd /user/local/hadoop
$./sbin/start-all.sh
#启动HBase
$cd user/local/hbase
$./bin/start-hbase.sh //启动HBase
$./bin/hbase shell //启动hbase shell

创建表

hbase> create 'student','info'
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
# put '表名称', 'row key','column name','value'


推荐阅读
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ... [详细]
  • python模块之正则
    re模块可以读懂你写的正则表达式根据你写的表达式去执行任务用re去操作正则正则表达式使用一些规则来检测一些字符串是否符合个人要求,从一段字符串中找到符合要求的内容。在 ... [详细]
  • 本文介绍了如何将包含复杂对象的字典保存到文件,并从文件中读取这些字典。 ... [详细]
  • 使用方法:将要控制的角色拖到TargetBody,将相机的焦点拖到CamerPivot,,建议CameraPivot是一个放在TargetBody下的子物体,并且位置应该是在Tar ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • C#实现文件的压缩与解压
    2019独角兽企业重金招聘Python工程师标准一、准备工作1、下载ICSharpCode.SharpZipLib.dll文件2、项目中引用这个dll二、文件压缩与解压共用类 ... [详细]
  • 本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
  • 本文将继续探讨 JavaScript 函数式编程的高级技巧及其实际应用。通过一个具体的寻路算法示例,我们将深入分析如何利用函数式编程的思想解决复杂问题。示例中,节点之间的连线代表路径,连线上的数字表示两点间的距离。我们将详细讲解如何通过递归和高阶函数等技术实现高效的寻路算法。 ... [详细]
  • 深入解析C#中app.config文件的配置与修改方法
    在C#开发过程中,经常需要对系统的配置文件进行读写操作,如系统初始化参数的修改或运行时参数的更新。本文将详细介绍如何在C#中正确配置和修改app.config文件,包括其结构、常见用法以及最佳实践。此外,还将探讨exe.config文件的生成机制及其在不同环境下的应用,帮助开发者更好地管理和维护应用程序的配置信息。 ... [详细]
  • 本文详细介绍了 com.apollographql.apollo.api.internal.Optional 类中的 orNull() 方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。 ... [详细]
author-avatar
溪边莎草
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有