作者:手机用户2502863087 | 来源:互联网 | 2023-09-13 13:08
RDD的持久化也就是说假如我们从hdfs读取文件,形成RDD。当我们对RDD进行持久化操作之后,,然后再针对该RDD进行action操作(这里我们假设执行count操作,中间可能经
RDD的持久化也就是说假如我们从hdfs读取文件,形成RDD。当我们对RDD进行持久化操作之后,
,然后再针对该RDD进行action操作(这里我们假设执行count操作,中间可能经历了一系列transformation操作),虽然第一次count()操作执行完了,但是也不会清除掉RDD中的数据,而是将其缓存在内存或者磁盘上。当第二次再执行count操作时,就不会重新从hdfs上读取数据,形成新的RDD,而是直接从RDD所在的所有节点的缓存中直接读取,对数据直接执行count操作,避免了重复计算。所以如果我们合理使用RDD的持久化机制,可以有效提高应用程序的性能。
从源码来看调用cache()方法,内部调用persist()方法 :
def cache(): this.type = persist()
persist()方法的默认的sotrageLevel 是MEMORY_ONLY,所以cache()方法默认使用内存缓存。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def persist(newLevel: StorageLevel): this.type = {
// 如果用户设置了checkpoint,我们需要覆盖旧的storage level。
// checkpoint机制会将RDD的数据容错到文件系统上(比如说
hdfs),如果在对RDD进行计算的时候,发现存储的数据不在了,
会先找一下checkpoint数据,如果有就是用checkpoint的数据,就不
需要去计算了。
if (isLocallyCheckpointed){
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
}else{
persist(newLevel, allowOverride = false)
}