热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

textFile_TextFile分区问题

篇首语:本文由编程笔记#小编为大家整理,主要介绍了TextFile分区问题相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了TextFile分区问题相关的知识,希望对你有一定的参考价值。









val rdd = sc.parallelize(List(1,2,3,4,5,6),第二参数)
这里的第二参数 获取方式有两种:
1.直接给定值,根据传入的值决定分区的数量
2.根据运行环境获取分区数量(core) -->例如 本地运行 设置为local 此时设置分区值默认分区就是1个

val rdd = sc.textFile(path: String, minPartitions: Int = defaultMinPartitions)
读取文件中内容算子中有两个参数 第一个参数是获取数据路径
这个理第二个参数,第二参数决定了分区的数量有两种情况
1.在不传递值的情况,使用是默认defaultMinPartitions --> 这个值时多少?
2.在传递分区数量是时候,这个分区值是多少

第一条主线 --> defaultMinParititons值时多少?
1.先从textFile这个算子入手,进入后台源码
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
2.在不传入分区数值的情况下,默认textFile中使用了一个值defaultMinPartitions,这个值就决定了分区数量,查看这个值
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
发现defaultMinPartitions,并不是一个值而是一个方法,在这个方法中实现是一个math比较最小值
这个比较中有一共值时固定是 2 这个值,和2比较时有一个全新的参数defaultParallelism,需要查看这个参数

3.继续拆安defaultParallelism这个值的时候发现他也是一个方法
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
方法最后一句是整个方法的返回一直,也就是说这个方法获取值,是最后一句产生,这个产生值还触发了一个TaskScheduler(任务调度),此时defaultParallelism
当查看这个方法的时候发现这个方法并没有实现体,这个方法是存在在特质中 def defaultParallelism(): Int
ctrl+alt+ 左右 回到之前调用或下一次调用(必须知道实现者是谁)
在触发抽方法的位置 --> ctrl+atl+鼠标左键-->就可以查看实现这个方法或触发这个方法的类
发现这个抽象方法实现类 --> TaskSchedulerImpl在这个类中有方法的实现

4.TaskSchedulerImpl在这个类中有方法的实现
override def defaultParallelism(): Int = backend.defaultParallelism()
发现原来抽象方法已经被重写了,并且有一个实现,此时只需要触发defaultParallelism就可以触发出这个值多少了
但是,点击查看后发现也是一个抽象方法defaultParallelism() --> 对这个实现在此查询实现者即可

5.查看defaultParallelism()
此时发现实现方式有两种
1.CoarseGrainedSchedulerBackend spark集群模式
2.LocalSchedulerBackend 本地模式
我们查看是集群模式,结果发现了
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

}
这个方法中比较的是最大值,其中第一个采纳数是totalCoreCount即集群核心数 第二个参数固是2

结论: 在调用textFile算子的时候,初始默认分区数量是2,除非小于2,否则默认分区数量就是2个

第二条主线 --> 查看分区计算流程
问题:先阶段已经知道分区数量默认是2个分区,具体分区中计算方式时候什么样式?(分片逻辑)

1.还是在textFile这个算子实现中
已经知道分区数量之后,查看内部对分区数量的使用,需要查看方法的实现
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
分区参数传入到一个叫做 hadoopFile中,所以此时就需要查看hadoopFile是谁

2.查看hadoopFile
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()

// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)

// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val cOnfBroadcast= broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)


//核心出现在这个位置,这里创建了一个HadoopRDD对象
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

3.查看HadoopRDD中存在哪些操作?
在这里类中
override def getPartitions: Array[Partition] = {
val jobCOnf= getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
getPartitions是切片方法的触发
val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 这个方法是具体的切分
val array = new Array[Partition](inputSplits.size) 就是获取分片个数
需要查看getSplits
4.查看 getSplits方法
这个方法是接口中抽象方法,此时需要使用 ctrl+atl+鼠标左边 查看这个方法的实现
一般处理数据方式都是 FileInputFormat类中查看 getSplits方法
这个方法和MR中切片放啊其实逻辑是一样的,核心位置
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// totalSize 获取文件的大小

Spark中和MR中切片最大的不同位置出现了,Spark会计算切片大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
启动numSplits就是之前minPartitions即 默认分区值

最终切片的位置依旧保留着MR中思想即 1.1冗余
long splitSize = computeSplitSize(goalSize, minSize, blockSize); //这里会计算真正切片的大小

long bytesRemaining = length; //文件大小
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //切片逻辑
//切完一片之后 会减去切片大小
bytesRemaining -= splitSize;

总结:分区数量其实是可以影响最最终文件的个数,但是在最终输出界过之前,会执行分片处理,这个分片才是最终输出分区的个数,我们若需要影响最终输出值,此时可以在最终输出算子之前调用 repartition 来修改分区

 



推荐阅读
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 这篇文章主要介绍了Python拼接字符串的七种方式,包括使用%、format()、join()、f-string等方法。每种方法都有其特点和限制,通过本文的介绍可以帮助读者更好地理解和运用字符串拼接的技巧。 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • 如何在mysqlshell命令中执行sql命令行本文介绍MySQL8.0shell子模块Util的两个导入特性importTableimport_table(JS和python版本 ... [详细]
author-avatar
开着宝马X6去赶集_692
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有