热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

原创:自定义sparkGraphX中的collectNeighborIds方法

***自定义收集VertexId的neighborIds*@authorTongXueQiang*defcollectNeighborIds[T,U](edgeDirection:
/**
* 自定义收集VertexId的neighborIds
* @author TongXueQiang
*/
def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = {
val nbrs = graph.mapReduceTriplets[Array[VertexId]](
//map函数
edgeTriplets => {
val msgTosrc = (edgeTriplets.srcId,Array(edgeTriplets.dstId));
val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId));
edgeDirection match {
case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst)
case EdgeDirection.Out => Iterator(msgTosrc)
case EdgeDirection.In => Iterator(msgTodst)
case EdgeDirection.Both => throw new SparkException("It doesn't make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)")
}
},_ ++ _)//reduce函数
nbrs
}
测试:
object Test {
  
  System.setProperty("hadoop.home.dir","D://hadoop-2.6.2");
  val cOnf= new SparkConf().setMaster("local").setAppName("SparkGraph");
  val sc = new SparkContext(conf);

  def main(args:Array[String]):Unit = {
    val graph = GraphGenerators.logNormalGraph(sc,numVertices = 100).map((id,_) => id.toDouble);
    collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;});

}



}

推荐阅读
author-avatar
残破的前进
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有