作者:se8529106 | 来源:互联网 | 2023-08-25 17:20
需先在sparkConf新增以下三个自定义配置项:是否开启自动重分区分区sparkConf.set(“enable.auto.repartition”,“true”)避免不必要的重
需先在sparkConf新增以下三个自定义配置项:
//是否开启自动重分区分区
sparkConf.set(“enable.auto.repartition”,“true”)
//避免不必要的重分区操作,增加个阈值,只有该批次要消费的kafka的分区内数据大于该阈值才进行拆分
sparkConf.set(“per.partition.offsetrange.threshold”,“300”)
//拆分后,每个kafkardd 的分区数据量。
sparkConf.set(“per.partition.after.partition.size”,“100”)
rdd的分区数,是由rdd的getPartitions函数决定。比如kafkardd的getPartitions方法实现如下:
offsetRanges其实就是一个数组:OffsetRange存储一个kafka分区元数据及其offset范围对应KafkaRDDPartition
可以在offsetRange生成的时候做下转换。spark2.x位置是DirectKafkaInputDstream的compute方法。
spark1.6位置是KafkaRDD apply方法 具体实现:
spark 2.x的实现如下图: