在spark 应用中,在处理函数,变量的作用范围和生命周期需要十分的注意特别是在spark集群模式下。比如如下的模式在rdd的操作中修改变量的值。
下面举一个例子
rdd 求和,比如下面一段代码,在看起来结果像是45,但是实际上是0
var sc = new SparkContext(new SparkConf().setMaster("local"))val array=Array(1,2,3,4,5,6,7,8,9)var test=new Testvar rddArray=sc.parallelize(array)rddArray.foreach(i=>{test.sum=test.sum+i;println(test.sum)})println("sum:"+test.sum)
上述代码的行为是未定义的,并且不同模式下运行的情况不一样,为了执行作业,Spark 将RDD 操作分解成tasks,每个task 由executor 执行。在执行之前spark 会计算task 的闭包。闭包是Executor 在RDD 上进行计算的时候必须可见的那些变量和方法。闭包会被序列化成并发送给每个executor 。发送给每个Executor 的闭包的变量是副本。因此在foreach 函数悲剧引用计数器的时候他操作的不是driver 节点上面的对象,执行者只能看到序列化闭包的副本,所以最终值并没有改变。
为了如果要实现上面的功能可以使用Accumerlator 。spark 中累加专门用于提供的一种机制,用于集群工作之间安全的更新变量
另一种方式就是使用collect 但是这可能会导致driver 内存不足, 更安全的使用take 的方式。
欢迎关注,更多惊喜等着你