热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark累加器及广播变量

累加器提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。假设我们在从文件中读取呼号列表对应的日志,同时也想知道输入文
累加器

提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。 假设我们在从文件中读取呼号列表对应的日志, 同时也想知道输入文件中有多少空行,下面的python程序使用累加器完成了这一点

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

上面的例子中创建了一个叫作 blankLines 的 Accumulator[Int] 对象,然后在输入
中看到一个空行时就对其加 1。执行完转化操作之后, 就打印出累加器中的值。注意,只有在运行 saveAsTextFile() 行动操作后才能看到正确的计数,因为行动操作前的转化操作flatMap() 是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作 flatMap() 被saveAsTextFile() 行动操作强制触发时才会开始求值。

  • 首先通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。 返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
  • Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add)增加累加器的值。
  • 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访问累加器的值

注意工作节点上的任务不能访问累加器的值(即不能读取累加器的值)。从这些任务的角度来看,累加器是一个只写变量。
下面的示例来验证呼号器:
内容:

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

代码如下:

# 创建用来验证呼号的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)
def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
return True
else:
invalidSignCount += 1
return False
# 对与每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
cOntactCount= validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x
+ y)
# 强制求值计算计数
contactCount.count()
if invalidSignCount.value <0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)

累加器的容错性

  • 对于要在行动操作中使用的累加器, Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。
  • 转化操作中使用的累加器, 就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。 举个例子,当一个被缓存下来但是没有经常使用的 RDD 在第一次从 LRU 缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制RDD 根据其谱系进行重算, 而副作用就是这也会使得谱系中的转化操作里的累加器进行更新,并再次发送到驱动器中。在转化操作中,累加器通常只用于调试目的。
广播变量
  • 可以让程序高效地向所有工作节点发送一个较大的只读值, 以供一个或多个 Spark 操作使用。下面是一个广播变量的使用例子:

# 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
# 读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryCOntactCounts= (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

主要的使用步骤为:

  • 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
    任何可序列化的类型都可以这么实现。
  • 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
  • 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)广播变量其实就是类型为 spark.broadcast.Broadcast[T] 的一个对象,其中存放着类型为 T 的值。可以在任务中通过对Broadcast 对象调用 value 来获取该对象的值。这个值只会被发送到各节点一次,使用的是一种高效的类似 BitTorrent 的通信机制

广播的优化主要通过选择既快又好的序列化格式来实现。你可以使用 spark.serializer 属性选择另一个序列化库来优化序列化过程。

基于分区的操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作, 都是我们应当尽量避免为每个元素都配置一次的工作。 Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。
下面的例子有一个在线的业余电台呼号数据库,可以用这个数据库查询日志中记录过的联系人呼号列表。 通过使用基于分区的操作,可以在每个分区内共享一个数据库连接池, 来避免建立太多连接,同时还可以重用 JSON 解析器。

def processCallSigns(signs):
"""使用连接池查询呼号"""
# 创建一个连接池
http = urllib3.PoolManager()
# 与每条呼号记录相关联的URL
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
# 创建请求(非阻塞)
requests = map(lambda x: (x, http.request('GET', x)), urls)
# 获取结果
result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
# 删除空的结果并返回
return filter(lambda x: x[1] is not None, result)
def fetchCallSigns(input):
"""获取呼号"""
return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
cOntactsContactList= fetchCallSigns(validSigns)

主要有三种按分区执行的操作符

《Spark累加器及广播变量》

mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。当计算平均值时,一种方法是将数值 RDD 转为二元组 RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作。下面举一个例子:
首先是 不使用mapPartitions()来求平均值。

def combineCtrs(c1, c2):
return (c1[0] + c2[0], c1[1] + c2[1])
def basicAvg(nums):
"""计算平均值"""
nums.map(lambda num: (num, 1)).reduce(combineCtrs)

第二个是使用其来求平均值:

def partitionCtr(nums):
"""计算分区的sumCounter"""
sumCount = [0, 0]
for num in nums:
sumCount[0] += num
sumCount[1] += 1
return [sumCount]
def fastAvg(nums):
"""计算平均值"""
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
return sumCount[0] / float(sumCount[1])
管道

Spark 的 pipe() 方法可以让我们使用任意一种语言实现 Spark 作业中的部分逻辑,只要它能读写 Unix 标准流就行。通过 pipe(),你可以将 RDD 中的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出——这个过程就是 RDD 的转化操作过程。这种接口和编程模型有较大的局限性, 但是有时候这恰恰是你想要的,比如在 map 或filter 操作中使用某些语言原生的函数。

数值RDD操作

Spark对包含数据的RDD提供了描述性的统计操作。Spark 的数值操作是通过流式算法实现的, 允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。下面对这些方法进行了介绍。

《Spark累加器及广播变量》

下面使用汇总统计来从数据中移除一些异常值。由于我们会两次使用同一个 RDD(一次用来计算汇总统计数据,另一次用来移除异常值),因此应该把这个 RDD 缓存下来。这里移除掉了较远的联系点。

# 要把String类型RDD转为数字数据,这样才能
# 使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasOnableDistances= distanceNumerics.filter(
lambda x: math.fabs(x - mean) <3 * stddev)
print reasonableDistances.collect()

推荐阅读
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 本地存储组件实现对IE低版本浏览器的兼容性支持 ... [详细]
  • 优化后的标题:Apache Cassandra数据写入操作详解
    本文详细解析了 Apache Cassandra 中的数据写入操作,重点介绍了 INSERT 命令的使用方法。该命令主要用于将数据插入到指定表的列中,其基本语法为 `INSERT INTO 表名 (列1, 列2, ...) VALUES (值1, 值2, ...)`。通过具体的示例和应用场景,文章深入探讨了如何高效地执行数据写入操作,以提升系统的性能和可靠性。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Flowable 流程图路径与节点展示:已执行节点高亮红色标记,增强可视化效果
    在Flowable流程图中,通常仅显示当前节点,而路径则需自行获取。特别是在多次驳回的情况下,节点可能会出现混乱。本文重点探讨了如何准确地展示流程图效果,包括已结束的流程和正在执行的流程。具体实现方法包括生成带有高亮红色标记的图片,以增强可视化效果,确保用户能够清晰地了解每个节点的状态。 ... [详细]
  • 本文详细介绍了在MySQL中如何高效利用EXPLAIN命令进行查询优化。通过实例解析和步骤说明,文章旨在帮助读者深入理解EXPLAIN命令的工作原理及其在性能调优中的应用,内容通俗易懂且结构清晰,适合各水平的数据库管理员和技术人员参考学习。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • 在Django中提交表单时遇到值错误问题如何解决?
    在Django项目中,当用户提交包含多个选择目标的表单时,可能会遇到值错误问题。本文将探讨如何通过优化表单处理逻辑和验证机制来有效解决这一问题,确保表单数据的准确性和完整性。 ... [详细]
author-avatar
雨季莫犹忆
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有