在groupBy之后取上前N并将它们视为RDD

 金色阳光CC 发布于 2022-12-07 20:10

我想获得最高N的groupByKey之后的项目RDD和转换的类型topNPerGroup(在下文)RDD[(String, Int)],其中List[Int]flatten

dataIS

val data = sc.parallelize(Seq("foo"->3, "foo"->1, "foo"->2,
                              "bar"->6, "bar"->5, "bar"->4))

N每组的最高项目计算如下:

val topNPerGroup: RDD[(String, List[Int]) = data.groupByKey.map { 
   case (key, numbers) => 
       key -> numbers.toList.sortBy(-_).take(2)
}

结果是

(bar,List(6, 5))
(foo,List(3, 2))

由...打印

topNPerGroup.collect.foreach(println)

如果我实现,topNPerGroup.collect.foreach(println)将生成(预期结果!)

(bar, 6)
(bar, 5)
(foo, 3)
(foo, 2)

emeth.. 7

Spark 1.4.0解决了这个问题.

看看https://github.com/apache/spark/commit/5e6ad24ff645a9b0f63d9c0f17193550963aa0a7

这将使用BoundedPriorityQueueaggregateByKey

def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
  self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
    seqOp = (queue, item) => {
      queue += item
    },
    combOp = (queue1, queue2) => {
      queue1 ++= queue2
    }
  ).mapValues(_.toArray.sorted(ord.reverse))  // This is an min-heap, so we reverse the order.
}


kberg.. 6

我最近一直在努力解决这个问题,但我的需求有点不同,因为我需要每个键的最高K值和数据集(key: Int, (domain: String, count: Long)).虽然您的数据集更简单,但仍然存在使用groupByKey的扩展/性能问题,如文档中所述.

在(K,V)对的数据集上调用时,返回(K,Iterable)对的数据集.注意:如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKey或combineByKey将产生更好的性能.

在我的情况下,我很快就遇到了问题,因为我Iterable的内容(K, Iterable)非常大,超过100万,因此排名和取得前N的成本变得非常昂贵并且会产生潜在的内存问题.

经过一些挖掘,请参阅下面的参考资料,这是一个完整的示例,使用combineByKey以一种将执行和扩展的方式完成相同的任务.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TopNForKey {

  var SampleDataset = List(
    (1, ("apple.com", 3L)),
    (1, ("google.com", 4L)),
    (1, ("stackoverflow.com", 10L)),
    (1, ("reddit.com", 15L)),
    (2, ("slashdot.org", 11L)),
    (2, ("samsung.com", 1L)),
    (2, ("apple.com", 9L)),
    (3, ("microsoft.com", 5L)),
    (3, ("yahoo.com", 3L)),
    (3, ("google.com", 4L)))

  //sort and trim a traversable (String, Long) tuple by _2 value of the tuple
  def topNs(xs: TraversableOnce[(String, Long)], n: Int) = {
    var ss = List[(String, Long)]()
    var min = Long.MaxValue
    var len = 0
    xs foreach { e =>
      if (len < n || e._2 > min) {
        ss = (e :: ss).sortBy((f) => f._2)
        min = ss.head._2
        len += 1
      }
      if (len > n) {
        ss = ss.tail
        min = ss.head._2
        len -= 1
      }
    }
    ss
  }

  def main(args: Array[String]): Unit = {

    val topN = 2
    val sc = new SparkContext("local", "TopN For Key")
    val rdd = sc.parallelize(SampleDataset).map((t) => (t._1, t._2))

    //use combineByKey to allow spark to partition the sorting and "trimming" across the cluster
    val topNForKey = rdd.combineByKey(
      //seed a list for each key to hold your top N's with your first record
      (v) => List[(String, Long)](v),
      //add the incoming value to the accumulating top N list for the key
      (acc: List[(String, Long)], v) => topNs(acc ++ List((v._1, v._2)), topN).toList,
      //merge top N lists returned from each partition into a new combined top N list
      (acc: List[(String, Long)], acc2: List[(String, Long)]) => topNs(acc ++ acc2, topN).toList)

    //print results sorting for pretty
    topNForKey.sortByKey(true).foreach((t) => {
      println(s"key: ${t._1}")
      t._2.foreach((v) => {
        println(s"----- $v")
      })

    })

  }

}

我在返回的rdd得到了什么......

(1, List(("google.com", 4L),
         ("stackoverflow.com", 10L))
(2, List(("apple.com", 9L),
         ("slashdot.org", 15L))
(3, List(("google.com", 4L),
         ("microsoft.com", 5L))

参考

https://www.mail-archive.com/user@spark.apache.org/msg16827.html

/sf/ask/17360801/

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

2 个回答
  • Spark 1.4.0解决了这个问题.

    看看https://github.com/apache/spark/commit/5e6ad24ff645a9b0f63d9c0f17193550963aa0a7

    这将使用BoundedPriorityQueueaggregateByKey

    def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
      self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
        seqOp = (queue, item) => {
          queue += item
        },
        combOp = (queue1, queue2) => {
          queue1 ++= queue2
        }
      ).mapValues(_.toArray.sorted(ord.reverse))  // This is an min-heap, so we reverse the order.
    }
    

    2022-12-11 02:12 回答
  • 我最近一直在努力解决这个问题,但我的需求有点不同,因为我需要每个键的最高K值和数据集(key: Int, (domain: String, count: Long)).虽然您的数据集更简单,但仍然存在使用groupByKey的扩展/性能问题,如文档中所述.

    在(K,V)对的数据集上调用时,返回(K,Iterable)对的数据集.注意:如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKey或combineByKey将产生更好的性能.

    在我的情况下,我很快就遇到了问题,因为我Iterable的内容(K, Iterable<V>)非常大,超过100万,因此排名和取得前N的成本变得非常昂贵并且会产生潜在的内存问题.

    经过一些挖掘,请参阅下面的参考资料,这是一个完整的示例,使用combineByKey以一种将执行和扩展的方式完成相同的任务.

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    object TopNForKey {
    
      var SampleDataset = List(
        (1, ("apple.com", 3L)),
        (1, ("google.com", 4L)),
        (1, ("stackoverflow.com", 10L)),
        (1, ("reddit.com", 15L)),
        (2, ("slashdot.org", 11L)),
        (2, ("samsung.com", 1L)),
        (2, ("apple.com", 9L)),
        (3, ("microsoft.com", 5L)),
        (3, ("yahoo.com", 3L)),
        (3, ("google.com", 4L)))
    
      //sort and trim a traversable (String, Long) tuple by _2 value of the tuple
      def topNs(xs: TraversableOnce[(String, Long)], n: Int) = {
        var ss = List[(String, Long)]()
        var min = Long.MaxValue
        var len = 0
        xs foreach { e =>
          if (len < n || e._2 > min) {
            ss = (e :: ss).sortBy((f) => f._2)
            min = ss.head._2
            len += 1
          }
          if (len > n) {
            ss = ss.tail
            min = ss.head._2
            len -= 1
          }
        }
        ss
      }
    
      def main(args: Array[String]): Unit = {
    
        val topN = 2
        val sc = new SparkContext("local", "TopN For Key")
        val rdd = sc.parallelize(SampleDataset).map((t) => (t._1, t._2))
    
        //use combineByKey to allow spark to partition the sorting and "trimming" across the cluster
        val topNForKey = rdd.combineByKey(
          //seed a list for each key to hold your top N's with your first record
          (v) => List[(String, Long)](v),
          //add the incoming value to the accumulating top N list for the key
          (acc: List[(String, Long)], v) => topNs(acc ++ List((v._1, v._2)), topN).toList,
          //merge top N lists returned from each partition into a new combined top N list
          (acc: List[(String, Long)], acc2: List[(String, Long)]) => topNs(acc ++ acc2, topN).toList)
    
        //print results sorting for pretty
        topNForKey.sortByKey(true).foreach((t) => {
          println(s"key: ${t._1}")
          t._2.foreach((v) => {
            println(s"----- $v")
          })
    
        })
    
      }
    
    }
    

    我在返回的rdd得到了什么......

    (1, List(("google.com", 4L),
             ("stackoverflow.com", 10L))
    (2, List(("apple.com", 9L),
             ("slashdot.org", 15L))
    (3, List(("google.com", 4L),
             ("microsoft.com", 5L))
    

    参考

    https://www.mail-archive.com/user@spark.apache.org/msg16827.html

    /sf/ask/17360801/

    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

    2022-12-11 02:13 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有