热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark20.sparkGraphx_2_图的转换

1.Graph的创建1.根据边和顶点来创建。defapply[VD:ClassTag,ED:ClassTag](vertices:RDD[(VertexId,VD)],edge

1.Graph的创建


1.根据边和顶点来创建。

def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)],edges: RDD[Edge[ED]],defaultVertexAttr: VD = null.asInstanceOf[VD],edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

2.根据边来创建。

所有顶点的属性相同,都是VD类型的defaultValue。

def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]],defaultValue: VD,edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

3.根据裸边(只有顶点ID)进行创建。

顶点的属性是defaultValue,边的属性为相同顶点边的个数,默认为1。

def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)],defaultValue: VD,uniqueEdges: Option[PartitionStrategy] = None,edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

2.Graph的转换


1.基本信息


  1. numEdges 返回边的数量
  2. numVertices 顶点的个数
  3. inDegrees:VertexRDD[Int] 返回顶点的入度,返回类型为RDD[(VertexId,Int)] Int是入度的具体值。
  4. outDegrees:VertexRDD[Int] 返回顶点的出度,返回类型为RDD[(VertexId,Int)] Int是出度的具体值。
  5. degrees:VertexRDD[Int] 返回顶点的入度与出度之和,返回类型为RDD[(VertexId,Int)] Int是度的具体值。

2.转换操作

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

对图中的每一个顶点进行map操作,顶点ID不能变,可以将顶点的属性改变成另一种类型。
如:scala> graph.mapVertices((id,attr)=>attr._1+":"+attr._2)

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

对图中的每个边进行map操作,边的方向不能改变,可以将边的属性改为lin一种类型。

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

对图中的每个三元组进行map操作,只能修改边的属性。


3.结构操作


  • reverse 反转

def reverse: Graph[VD, ED]

反转整个图,将边的方向调头。
如:

graph.reverse.triplets.map(x=>"["+x.srcId+":"+x.srcAttr+"-->"+x.attr+"-->"+x.dstId+":"+x.dstAttr+"]").collect.foreach(println)

  • subgrap 获取子图

def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]

获取子图。可以通过参数名来指定传参,如果subgraph中有的边没有顶点对应,那么会自动将该边去除。

graph.subgraph(vpred=(id,attr)=>attr._2 == "professor").triplets.map(x=>"["+x.srcId+":"+x.srcAttr+"-->"+x.attr+"-->"+x.dstId+":"+x.dstAttr+"]").collect.foreach(println)

没有边的顶点不会自动被删除。

graph.subgraph((x=>false)).numVertices

  • mask 求两个图的交集

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

将当前图和other图做交集,返回一个新图。如果other中的属性与原图的属性不同,那么保留原图的属性。

val other =graph.subgraph(vpred=(id,attr)=>attr._2 == "professor").mapVertices((id,attr)=>attr._1 +":"+attr._2)
other.triplets.map(x=>"["+x.srcId+":"+x.srcAttr+"-->"+x.attr+"-->"+x.dstId+":"+x.dstAttr+"]").collect.foreach(println) //输出other
graph.mask(other).triplets.map(x=>"["+x.srcId+":"+x.srcAttr+"-->"+x.attr+"-->"+x.dstId+":"+x.dstAttr+"]").collect.foreach(println)

  • groupEdges 合并两条边

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

合并两条边,通过函数合并边的属性。【注意:两条边要在一个分区内。】


4.聚合


collectNeighbors

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

收集邻居节点的数据,根据指定的方向。返回的数据为RDD[(VertexId,Array[(VertexId,VD)])] 顶点的属性的一个数组。数组中包含邻居节点的顶点。

graph.collectNeighbors(EdgeDirection.In).collect

collectNeighborIds

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

与上一个方法类似,只收集ID。

graph.collectNeighborIds(EdgeDirection.In).collect

aggregateMessages

def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,mergeMsg: (A, A) => A,tripletFields: TripletFields = TripletFields.All): VertexRDD[A]

每个边都会通过sendMsg发送一个消息,每个顶点都会通过mergeMsg来处理它收到的消息,tripletFields存在主要用于定制EdgeContext对象中的属性的值是否存在,为了减少数据通信量。

//初始化顶点集合val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))//创建顶点的RDD表示val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)//初始化边的集合val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))//创建边的RDD表示val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)//创建一个图val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
graph.aggregateMessages[Array[(VertexId, (String, Int))]](ctx &#61;> ctx.sendToDst(Array((ctx.srcId.toLong, (ctx.srcAttr._1, ctx.srcAttr._2)))), _ &#43;&#43; _).collect.foreach(v &#61;> {println(s"id: ${v._1}"); for (arr <- v._2) {println(s" ${arr._1} (name: ${arr._2._1} age: ${arr._2._2})")}})

5.关联操作


joinVertices

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) &#61;> VD): Graph[VD, ED]

将相同顶点ID的数据进行加权&#xff0c;将U这种类型的数据加入到VD这种类型的数据上&#xff0c;但是不能修改VD的类型。可以使用case class 类型&#xff0c;将VD封装为case class&#xff0c;mapFunc对VD进行属性补全。


outerJoinVertices

def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])( mapFunc: (VertexId, VD, Option[U]) &#61;> VD2)(implicit eq: VD &#61;:&#61; VD2 &#61; null) : Graph[VD2, ED]

和joinVertices类似&#xff0c;只是如果没有相应的节点&#xff0c;那么join的值默认为None。


6.Pregel


前提&#xff1a;

对于节点来说有两种状态&#xff1a;1.钝化态&#xff0c;类似于休眠&#xff0c;不做任何事。2.激活态&#xff0c;干活。
节点能够处于激活态需要有条件&#xff1a;1.节点收到消息或者2.成功发送了任何一条消息。

def pregel[A: ClassTag](initialMsg: A,maxIterations: Int &#61; Int.MaxValue,activeDirection: EdgeDirection &#61; EdgeDirection.Either)(vprog: (VertexId, VD, A) &#61;> VD,sendMsg: EdgeTriplet[VD, ED] &#61;> Iterator[(VertexId, A)],mergeMsg: (A, A) &#61;> A): Graph[VD, ED]

initialMsg 图初试化的时候&#xff0c;开始模型计算的时候&#xff0c;所有节点都会收到一个消息&#xff0c;所有节点都是active的。
maxIterations 最大迭代次数。
activeDirection 规定了发送消息的方向。
柯里化&#xff1a;
vprog 激活态且具有activeDirection的节点调用该消息将聚合后的数据和本节点进行属性的合并。
sendMsg 激活态的节点调用该方法发送消息。
mergeMsg 如果一个节点接收到多个消息&#xff0c;先用mergeMsg来将多条消息聚合成一条消息。如果节点只收到一条消息&#xff0c;则不会调用该函数。
实例&#xff1a;求节点5到各个节点的最短距离。


代码实现&#xff1a;

package com.dengdan.practiceimport org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Edge, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Practice extends App {//屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//设定一个SparkConfval conf &#61; new SparkConf().setAppName("simpleGraphx").setMaster("local[*]")val sc &#61; new SparkContext(conf)//初始化顶点集合val vertexArray &#61; Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))//创建顶点RDD表示val vertexRDD: RDD[(Long, (String, Int))] &#61; sc.parallelize(vertexArray)//初始化边表示val edgeArray &#61; Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))//创建边RDD表示val edgeRDD: RDD[Edge[Int]] &#61; sc.parallelize(edgeArray)//创建图val graph: Graph[(String, Int), Int] &#61; Graph(vertexRDD, edgeRDD)//*************************** 实用操作 ****************************************println("聚合操作")println("**********************************************************")val sourceId: VertexId &#61; 5L //定义源点val initialGraph &#61; graph.mapVertices((id, _) &#61;> if (id &#61;&#61; sourceId) 0.0 else Double.PositiveInfinity)initialGraph.triplets.collect() foreach (println)println("找出5到各顶点的最短距离")val sssp &#61; initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(//id为满足条件的节点的编号&#xff0c;dist是该节点的属性值&#xff0c;newDist为收到消息后&#xff0c;新的属性(id, dist, newDist) &#61;> {println("||||" &#43; id &#43; " 收到消息")math.min(dist, newDist)},triplet &#61;> {println(">>>>" &#43; triplet.srcId &#43; " 发送消息")//源节点的属性 &#43; 边的属性值 <目标节点的属性if (triplet.srcAttr &#43; triplet.attr < triplet.dstAttr) {//发送成功// Interator 表示发送&#xff0c;发送给dstId&#xff0c;发送的内容为 triplet.srcAttr &#43; triplet.attrIterator((triplet.dstId, triplet.srcAttr &#43; triplet.attr))} else {//发送失败Iterator.empty}},(a, b) &#61;> {println("$$$$$")math.min(a, b)} //当前节点所有输入的最短距离)println("--------------图信息------------------")sssp.triplets.collect().foreach(println)println("-------------节点5到各个节点的最短距离---")println(sssp.vertices.collect.mkString("\n"))sc.stop()
}

运行结果&#xff1a;

聚合操作
**********************************************************
((2,Infinity),(1,Infinity),7)
((2,Infinity),(4,Infinity),2)
((3,Infinity),(2,Infinity),4)
((3,Infinity),(6,Infinity),3)
((4,Infinity),(1,Infinity),1)
((2,Infinity),(5,0.0),2)
((5,0.0),(3,Infinity),8)
((5,0.0),(6,Infinity),3)
找出5到各顶点的最短距离
# 初始化时
||||4 收到消息
||||6 收到消息
||||5 收到消息
||||2 收到消息
||||3 收到消息
||||1 收到消息
>>>>4 发送消息
>>>>2 发送消息
>>>>2 发送消息
>>>>5 发送消息
>>>>3 发送消息
>>>>2 发送消息
>>>>3 发送消息
>>>>5 发送消息
# 第一轮迭代
||||3 收到消息
||||6 收到消息
>>>>3 发送消息
>>>>3 发送消息
# 第二轮迭代
||||2 收到消息
>>>>2 发送消息
>>>>2 发送消息
>>>>2 发送消息
# 第三轮迭代
||||1 收到消息
||||4 收到消息
>>>>4 发送消息
# 第四轮迭代
||||1 收到消息
# 迭代结束
--------------图信息------------------
((2,12.0),(1,15.0),7)
((2,12.0),(4,14.0),2)
((3,8.0),(2,12.0),4)
((3,8.0),(6,3.0),3)
((4,14.0),(1,15.0),1)
((2,12.0),(5,0.0),2)
((5,0.0),(3,8.0),8)
((5,0.0),(6,3.0),3)
-------------节点5到各个节点的最短距离---
(1,15.0)
(2,12.0)
(3,8.0)
(4,14.0)
(5,0.0)
(6,3.0)

推荐阅读
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 本文详细介绍了在 Oracle 数据库中使用 MyBatis 实现增删改查操作的方法。针对查询操作,文章解释了如何通过创建字段映射来处理数据库字段风格与 Java 对象之间的差异,确保查询结果能够正确映射到持久层对象。此外,还探讨了插入、更新和删除操作的具体实现及其最佳实践,帮助开发者高效地管理和操作 Oracle 数据库中的数据。 ... [详细]
  • 深入理解 Java 控制结构的全面指南 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 2022年7月20日:关键数据与市场动态分析
    2022年7月20日,本文对当日的关键数据和市场动态进行了深入分析。主要内容包括:1. 关键数据的解读与趋势分析;2. 市场动态的变化及其对投资策略的影响;3. 相关经济指标的评估。通过这些分析,帮助读者更好地理解当前市场环境,为决策提供参考。 ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • Silverlight 实战指南:深入解析用户提交数据的验证与捕获机制
    本文深入探讨了Silverlight中用户提交数据的验证与捕获机制,详细分析了四种主要的验证方法:基本异常处理、DataAnnotation注解、IDataErrorInfo客户端同步验证以及自定义验证策略。通过实例解析,帮助开发者更好地理解和应用这些机制,提升应用程序的数据处理能力和用户体验。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 数据库多表联合查询:内连接与外连接详解
    在数据库的多表查询中,内连接和外连接是两种常用的技术手段。内连接用于检索多个表中相互匹配的记录,即只有当两个表中的记录满足特定的连接条件时,这些记录才会被包含在查询结果中。相比之下,外连接则不仅返回匹配的记录,还可以选择性地返回不匹配的记录,具体取决于左外连接、右外连接或全外连接的选择。本文将详细解析这两种连接方式的使用场景及其语法结构,帮助读者更好地理解和应用多表查询技术。 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
author-avatar
Mayuki命_103
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有