2019独角兽企业重金招聘Python工程师标准>>>
Programming Guide中的例子
import org.apache.spark.graphx._val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"),Edge(5L, 3L, "advisor"),Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val graph = Graph(users, relationships)
val rankedGraph = graph.staticPageRank(3)
实际的例子,别踩白快的传输分析:
val transfer = sc.textFile("hdfs://LDKJSERVER1046:8020/user/flume/transfer/*/*/*/*.tsv").filter(line => line.contains("biecaibaikuai"))val structuredTransferRDD = transfer.map(line => {val trunks = line.split("\t")if(trunks.length == 35){(trunks(6), trunks(7), trunks(3), trunks(5), trunks(12), trunks(13)) }})
val rdd = structuredTransferRDD.filter(arg => arg != ()).map(arg => arg.asInstanceOf[(String, String, String, String, String, String)])
val repatitionedRDD = rdd.repartition(100)
repatitionedRDD.cache
repatitionedRDD.countval mappedRDD = repatitionedRDD.map(arg => ((arg._1, arg._1.hashCode.toLong),(arg._2, arg._2.hashCode.toLong), arg._3, arg._4, arg._5, arg._6))val vertexs = mappedRDD.flatMap(arg => Array((arg._1._2, arg._1._1), (arg._2._2, arg._2._1))).distinct
vertexs.cache
vertexs.count
import org.apache.spark.graphx._
val edges = mappedRDD.map(arg => (arg._1._2, arg._2._2, arg._3)).distinct.map(arg => Edge(arg._1, arg._2, arg._3))
edges.cache
edges.count
val graph = Graph(vertexs, edges)
val rankedGraph = graph.staticPageRank(3)
rankedGraph.vertices.first
rankedGraph.vertices.filter(arg => arg._2 != 0.15).first