Apache Spark:按键将RDD分成多个RDD以保存值

 吖鸟集团总公司 发布于 2022-12-31 13:51

我使用Spark 1.0.1处理大量数据.每行包含一个ID号,一些包含重复的ID.我想在同一位置保存具有相同ID号的所有行,但我无法有效地执行此操作.我创建了(ID号,数据行)对的RDD [(String,String)]:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

一种有效但不具备性能的方法是收集ID号,过滤每个ID的RDD,并使用与文本文件相同的ID保存值的RDD.

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

我还尝试了groupByKey或reduceByKey,以便RDD中的每个元组包含一个唯一的ID号作为键,以及由该ID号的新行分隔的一组组合数据行.我想只使用foreach迭代RDD一次来保存数据,但是它不能将值作为RDD给出

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

基本上,我想通过ID号将RDD拆分为多个RDD,并将该ID号的值保存到它们自己的位置.

1 个回答
  • 我认为这个问题类似于 通过键Spark写入多个输出 - 一个Spark作业

    请在那里参考答案.

    import org.apache.hadoop.io.NullWritable
    
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    
    class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: Any, value: Any): Any = 
        NullWritable.get()
    
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
        key.asInstanceOf[String]
    }
    
    object Split {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Split" + args(1))
        val sc = new SparkContext(conf)
        sc.textFile("input/path")
        .map(a => (k, v)) // Your own implementation
        .partitionBy(new HashPartitioner(num))
        .saveAsHadoopFile("output/path", classOf[String], classOf[String],
          classOf[RDDMultipleTextOutputFormat])
        spark.stop()
      }
    }
    

    刚看到上面的类似答案,但实际上我们不需要自定义分区.MultipleTextOutputFormat将为每个键创建文件.具有相同键的多个记录可以归入同一分区.

    new HashPartitioner(num),其中num是您想要的分区号.如果您有大量不同的密钥,可以将数字设置为大.在这种情况下,每个分区都不会打开太多的hdfs文件处理程序.

    2022-12-31 13:54 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有