作者: | 来源:互联网 | 2023-09-24 11:14
spark的checkpoint机制
在spark code开发时,有时会用到sparkContext().setCheckpointDir(/hdfspath)来做一个检查点,本文就来聊下为什么要创建这样一个检查点。
为什么要做checkpoint?
在spark计算里面,如果计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,当然就比较费心能。如果我们将中间计算结果通过cache或者persist放到内存或者磁盘中,但是也是有可能会出现内存出问题或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所有就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是hdfs)。
什么时候写checkpoint数据?
- 当RDD的action算子触发计算结束后会执行checkpoint。
- 在spark streaming 中每创建一个batch的RDD也会触发checkpoint操作。
什么时候读checkpoint数据?
task计算失败的时候会从checkpoint读取数据进行计算。
checkpoint具体实现有哪些?
首先需要用sparkContext设置hdfs的checkpoint的目录:
sparkContext().setCheckpointDir("hdfs://yolanda:9000/checkpoint");
上面代码执行之后,hdfs里面会创建一个目录:
/checkpoint/c1a51ee9-1daf-4169-991e-b290f88bac20
然后执行checkpoint
val rdd1=sparkContext.textFile("hdfs://yolanda:9000/wordcount/input")
val rdd2=rdd1.flatMap(_.split(" ")).map(_,1).reduceByKey(_+_)
rdd2.checkpoint
这时里面是不会有数据的,因为checkpoint是一个transformation,要触发action才可以,下面我们进行一个action操作:
rdd2.collect
这时会启动两个任务,一个是计算collect结果,另一个是计算rdd2中的数据,并将其保存到hdfs的目录上去。
- 这个rdd之前所有的依赖关系会被移除掉,也就是说你再进行计算时,直接从这个hdfs目录中去读取数据,而不需要再根据rdd的依赖关系去重新计算,这样节省了很多计算。
- 建议先将rdd缓存一下,这样会直接对内存中的数据进行ck,即:
rdd2.cache().checkpoint
不然的话还要启动一个任务根据rdd的依赖关系去重新计算。