热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

graphx初涉,结合源码学习一

Graphx中的重要概念graph1.graph成员变量有:vertices,edges,triplets.2.在triplets中,同时记录着edge和vertex成员函数函数分成几大类对所有顶点或

Graphx中的重要概念

graph

1.graph成员变量有:vertices,edges,triplets.

2.在triplets中,同时记录着edge和vertex

成员函数

函数分成几大类

  1. 对所有顶点或边的操作,但不改变图结构本身,如mapEdges, mapVertices
  2. 子图,类似于集合操作中的filter subGraph
  3. 图的分割,即paritition操作,这个对于Spark计算来说,很关键,正是因为有了不同的Partition,才有了并行处理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash来将整个图分成多个区域。
  4. outerJoinVertices 顶点的外连接操作

图的运算和操作 GraphOps

图的常用算法是集中抽象到GraphOps这个类中,在Graph里作了隐式转换,将Graph转换为GraphOps,源码如下:

/**
* Implicitly extracts the [[GraphOps]] member from a graph.
*
* To improve modularity the Graph type only contains a small set of basic operations.
* All the convenience operations are defined in the [[GraphOps]] class which may be
* shared across multiple graph implementations.
*/
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
(g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops

支持的操作如下

  1. collectNeighborIds
  2. collectNeighbors
  3. collectEdges
  4. joinVertices
  5. filter
  6. pickRandomVertex
  7. pregel
  8. pageRank
  9. staticPageRank
  10. connectedComponents
  11. triangleCount
  12. stronglyConnectedComponents

源码如下:

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx

import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.SparkException
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import org.apache.spark.graphx.lib._

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
* efficient GraphX API. This class is implicitly constructed for each Graph object.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
*/
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {

/** The number of edges in the graph. */
@transient lazy val numEdges: LOng= graph.edges.count()

/** The number of vertices in the graph. */
@transient lazy val numVertices: LOng= graph.vertices.count()

/**
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
@transient lazy val inDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
@transient lazy val outDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
@transient lazy val degrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

/**
* Computes the neighboring vertex degrees.
*
* @param edgeDirection the direction along which to collect neighboring vertex attributes
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)
} else { // EdgeDirection.Either
graph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,
TripletFields.None)
}
}

/**
* Collect the neighbor vertex ids for each vertex.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the set of neighboring ids for each vertex
*/
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
graph.aggregateMessages[Array[VertexId]](
ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToSrc(Array(ctx.dstId)),
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToDst(Array(ctx.srcId)),
_ ++ _, TripletFields.None)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[VertexId])
}
} // end of collectNeighborIds

/**
* Collect the neighbor vertex attributes for each vertex.
*
* @note This function could be highly inefficient on power-law
* graphs where high degree vertices may force a large amount of
* information to be collected to a single location.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring vertex attributes for each vertex
*/
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
val nbrs = edgeDirection match {
case EdgeDirection.Either =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => {
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
(a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
(a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
(a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
}
graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}
} // end of collectNeighbor

/**
* Returns an RDD that contains for each vertex v its local edges,
* i.e., the edges that are incident on v, in the user-specified direction.
* Warning: note that singleton vertices, those with no edges in the given
* direction will not be part of the return value.
*
* @note This function could be highly inefficient on power-law
* graphs where high degree vertices may force a large amount of
* information to be collected to a single location.
*
* @param edgeDirection the direction along which to collect
* the local edges of vertices
*
* @return the local edges for each vertex
*/
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]] = {
edgeDirection match {
case EdgeDirection.Either =>
graph.aggregateMessages[Array[Edge[ED]]](
ctx => {
ctx.sendToSrc(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr)))
ctx.sendToDst(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr)))
},
(a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.In =>
graph.aggregateMessages[Array[Edge[ED]]](
ctx => ctx.sendToDst(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr))),
(a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.Out =>
graph.aggregateMessages[Array[Edge[ED]]](
ctx => ctx.sendToSrc(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr))),
(a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
}
}

/**
* Join the vertices with an RDD and then apply a function from the
* vertex and RDD entry to a new vertex value. The input table
* should contain at most one entry for each vertex. If no entry is
* provided the map function is skipped and the old value is used.
*
* @tparam U the type of entry in the table of updates
* @param table the table to join with the vertices in the graph.
* The table should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex
* values. The map function is invoked only for vertices with a
* corresponding entry in the table otherwise the old vertex value
* is used.
*
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
*
* {{{
* val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph")
* .mapVertices((_, _) => 0)
* val outDeg = rawGraph.outDegrees
* val graph = rawGraph.joinVertices[Int](outDeg)
* ((_, _, outDeg) => outDeg)
* }}}
*
*/
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case NOne=> data
}
}
graph.outerJoinVertices(table)(uf)
}

/**
* Filter the graph by computing some values to filter on, and applying the predicates.
*
* @param preprocess a function to compute new vertex and edge data before filtering
* @param epred edge pred to filter on after preprocess, see more details under
* [[org.apache.spark.graphx.Graph#subgraph]]
* @param vpred vertex pred to filter on after prerocess, see more details under
* [[org.apache.spark.graphx.Graph#subgraph]]
* @tparam VD2 vertex type the vpred operates on
* @tparam ED2 edge type the epred operates on
* @return a subgraph of the orginal graph, with its data unchanged
*
* @example This function can be used to filter the graph based on some property, without
* changing the vertex and edge values in your program. For example, we could remove the vertices
* in a graph with 0 outdegree
*
* {{{
* graph.filter(
* graph => {
* val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
* vpred = (vid: VertexId, deg:Int) => deg > 0
* )
* }}}
*
*/
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}

/**
* Picks a random vertex from the graph and returns its ID.
*/
def pickRandomVertex(): VertexId = {
val probability = 50.0 / graph.numVertices
var found = false
var retVal: VertexId = null.asInstanceOf[VertexId]
while (!found) {
val selectedVertices = graph.vertices.flatMap { vidVvals =>
if (Random.nextDouble() else { None }
}
if (selectedVertices.count > 1) {
found = true
val collectedVertices = selectedVertices.collect()
retVal = collectedVertices(Random.nextInt(collectedVertices.size))
}
}
retVal
}

/**
* Convert bi-directional edges into uni-directional ones.
* Some graph algorithms (e.g., TriangleCount) assume that an input graph
* has its edges in canonical direction.
* This function rewrites the vertex ids of edges so that srcIds are smaller
* than dstIds, and merges the duplicated edges.
*
* @param mergeFunc the user defined reduce function which should
* be commutative and associative and is used to combine the output
* of the map phase
*
* @return the resulting graph with canonical edges
*/
def convertToCanonicalEdges(
mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED] = {
val newEdges =
graph.edges
.map {
case e if e.srcId ((e.srcId, e.dstId), e.attr)
case e => ((e.dstId, e.srcId), e.attr)
}
.reduceByKey(mergeFunc)
.map(e => new Edge(e._1._1, e._1._2, e._2))
Graph(graph.vertices, newEdges)
}

/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}

/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergence]]
*/
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.runUntilConvergence(graph, tol, resetProb)
}


/**
* Run personalized PageRank for a given vertex, such that all random walks
* are started relative to the source node.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergenceWithOptions]]
*/
def personalizedPageRank(src: VertexId, tol: Double,
resetProb: Double = 0.15) : Graph[Double, Double] = {
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
}

/**
* Run Personalized PageRank for a fixed number of iterations with
* with all iterations originating at the source node
* returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runWithOptions]]
*/
def staticPersonalizedPageRank(src: VertexId, numIter: Int,
resetProb: Double = 0.15) : Graph[Double, Double] = {
PageRank.runWithOptions(graph, numIter, resetProb, Some(src))
}

/**
* Run PageRank for a fixed number of iterations returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#run]]
*/
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.run(graph, numIter, resetProb)
}

/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
}

/**
* Compute the number of triangles passing through each vertex.
*
* @see [[org.apache.spark.graphx.lib.TriangleCount$#run]]
*/
def triangleCount(): Graph[Int, ED] = {
TriangleCount.run(graph)
}

/**
* Compute the strongly connected component (SCC) of each vertex and return a graph with the
* vertex value containing the lowest vertex id in the SCC containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]]
*/
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}
} // end of GraphOps

RDD

RDD是Spark体系的核心,那么Graphx中引入了哪些新的RDD呢:

VertexRDD:VertexRDD[A]继承自RDD[(VertexID, A)]并且添加了额外的限制,那就是每个VertexID只能出现一次。此外,VertexRDD[A]代表了一组属性类型为A的顶点。在内部,这通过 保存顶点属性到一个可重复使用的hash-map数据结构来获得。所以,如果两个VertexRDDs从相同的基本VertexRDD获得(如通过filter或者mapValues),它们能够在固定的时间内连接 而不需要hash评价。为了利用这个索引数据结构,VertexRDD暴露了一下附加的功能,VertexRDD源码如下:

package org.apache.spark.graphx

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.VertexRDDImpl

/**
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
* pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
*
* Additionally, stores routing information to enable joining the vertex attributes with an
* [[EdgeRDD]].
*
* @example Construct a `VertexRDD` from a plain RDD:
* {{{
* // Construct an initial vertex set
* val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
* val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
* }}}
*
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
abstract class VertexRDD[VD](
sc: SparkContext,
deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {

implicit protected def vdTag: ClassTag[VD]

private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]]

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

/**
* Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[ShippableVertexPartition[VD]].iterator(part, context).next().iterator
}

/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
* VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD.
*/
def reindex(): VertexRDD[VD]

/**
* Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
*/
private[graphx] def mapVertexPartitions[VD2: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2]

/**
* Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
* preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
* rather than allocating new memory.
*
* It is declared and defined here to allow refining the return type from `RDD[(VertexId, VD)]` to
* `VertexRDD[VD]`.
*
* @param pred the user defined predicate, which takes a tuple to conform to the
* `RDD[(VertexId, VD)]` interface
*/
override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))

/**
* Maps each vertex attribute, preserving the index.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD
*/
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]

/**
* Maps each vertex attribute, additionally supplying the vertex ID.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each ID-value pair in the RDD
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index.
*/
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

/**
* For each VertexId present in both `this` and `other`, minus will act as a set difference
* operation returning only those unique VertexId's present in `this`.
*
* @param other an RDD to run the set operation against
*/
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]

/**
* For each VertexId present in both `this` and `other`, minus will act as a set difference
* operation returning only those unique VertexId's present in `this`.
*
* @param other a VertexRDD to run the set operation against
*/
def minus(other: VertexRDD[VD]): VertexRDD[VD]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
* @param other the other RDD[(VertexId, VD)] with which to diff against.
*/
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
* @param other the other VertexRDD with which to diff against.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD]

/**
* Left joins this RDD with another VertexRDD with the same index. This function will fail if
* both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
* each vertex in `this`.
* If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
* @param other the other VertexRDD with which to join.
* @param f the function mapping a vertex id and its attributes in this and the other vertex set
* to a new vertex attribute.
* @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

/**
* Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
* used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is
* missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates,
* the vertex is picked arbitrarily.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
* @param other the other VertexRDD with which to join
* @param f the function mapping a vertex id and its attributes in this and the other vertex set
* to a new vertex attribute.
* @return a VertexRDD containing all the vertices in this VertexRDD with the attributes emitted
* by `f`.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(VertexId, VD2)])
(f: (VertexId, VD, Option[VD2]) => VD3)
: VertexRDD[VD3]

/**
* Efficiently inner joins this VertexRDD with another VertexRDD sharing the same index. See
* [[innerJoin]] for the behavior of the join.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

/**
* Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation
* is used.
*
* @param other an RDD containing vertices to join. If there are multiple entries for the same
* vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries.
* @param f the join function applied to corresponding values of `this` and `other`
* @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both
* `this` and `other`, with values supplied by `f`
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

/**
* Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a
* VertexRDD co-indexed with `this`.
*
* @param messages an RDD containing messages to aggregate, where each message is a pair of its
* target vertex ID and the message data
* @param reduceFunc the associative aggregation function for merging messages to the same vertex
* @return a VertexRDD co-indexed with `this`, containing only vertices that received messages.
* For those vertices, their values are the result of applying `reduceFunc` to all received
* messages.
*/
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]

/**
* Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
* [[EdgeRDD]].
*/
def reverseRoutingTables(): VertexRDD[VD]

/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
def withEdges(edges: EdgeRDD[_]): VertexRDD[VD]

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2]

/**
* Changes the target storage level while preserving all other properties of the
* VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): VertexRDD[VD]

/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
private[graphx] def shipVertexAttributes(
shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])]

/** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])]

} // end of VertexRDD

EdgeRDD:EdgeRDD[ED]继承自RDD[Edge[ED]],使用定义在 PartitionStrategy的 各种分区策略中的一个在块分区中组织边。在每个分区中,边属性和相邻结构被分别保存,当属性值改变时,它们可以最大化的重用。

EdgeRDD暴露了三个额外的函数,其源代码如下:
abstract class EdgeRDD[ED](
sc: SparkContext,
deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {

// scalastyle:off structural.type
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
// scalastyle:on structural.type

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, _])].iterator(part, context)
if (p.hasNext) {
p.next()._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
* @tparam ED2 the new edge value type
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED]

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
* [[PartitionStrategy]].
*
* @param other the EdgeRDD to join with
* @param f the join function applied to corresponding values of `this` and `other`
* @return a new EdgeRDD containing only edges that appear in both `this` and `other`,
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]
}

较之EdgeRdd,VertexRDD更为重要,其上的操作也很多,主要集中于Vertex之上属性的合并,说到合并就不得不扯到关系代数和集合论,所以在VertexRdd中能看到许多类似于sql中的术语,如

  • leftJoin
  • innerJoin

至于leftJoin, innerJoin, outerJoin的区别,建议谷歌一下,不再赘述,以上源代码里也有详细解释。

Graphx在生产场景的应用

图的存储和加载

在大数据的环境下,如果图很巨大,表示顶点和边的数据不足以放在一个文件中怎么办? 用HDFS。

一般来说,我们会将所有与顶点相关的内容保存在一个文件中vertexFile,所有与边相关的信息保存在另一个文件中edgeFile。

生成某一个具体的图时,用edge就可以表示图中顶点的关联关系,同时图的结构也表示出来了。

生成图

graphLoader是graphx中专门用于图的加载和生成,最重要的函数就是edgeListFile,源码如下:

package org.apache.spark.graphx

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}

/**
* Provides utilities for loading [[Graph]]s from files.
*/
object GraphLoader extends Logging {

/**
* Loads a graph from an edge list formatted file where each line contains two integers: a source
* id and a target id. Skips lines that begin with `#`.
*
* If desired the edges can be automatically oriented in the positive
* direction (source Id * true.
*
* @example Loads a file in the following format:
* {{{
* # Comment Line
* # Source Id <\t> Target Id
* 1 -5
* 1 2
* 2 7
* 1 8
* }}}
*
* @param sc SparkContext
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param numEdgePartitions the number of partitions for the edge RDD
* Setting this value to -1 will use the default parallelism.
* @param edgeStorageLevel the desired storage level for the edge partitions
* @param vertexStorageLevel the desired storage level for the vertex partitions
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val lines =
if (numEdgePartitions > 0) {
sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
} else {
sc.textFile(path)
}
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length <2) {
throw new IllegalArgumentException("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile

}

PageRank

什么是PageRank

PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。它由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现了将链接价值概念作为排名因素。PageRank将对页面的链接看成投票,指示了重要性。

PageRank的核心思想

分析步骤用文字表述如下:

  1. 网页和网页之间的关系用图来表示
  2. 网页A和网页B之间的连接关系表示任意一个用户从网页A到转到网页B的可能性(概率)
  3. 所有网页的排名用一维向量来B来表示

所有网页之间的连接用矩阵A来表示,所有网页排名用B来表示。

PageRank如何进行优化

网页的排名计算最终抽象为矩阵相乘,于是可以联系到矩阵相乘可以并行化处理,spark这里借助的是Prege模型,PageRank里面定义的函数源码如下:

object PageRank extends Logging {


/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
resetProb: Double = 0.15): Graph[Double, Double] =
{
runWithOptions(graph, numIter, resetProb)
}

/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
val persOnalized= srcId isDefined
val src: VertexId = srcId.getOrElse(-1L)

// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute resetProb.
// When running personalized pagerank, only the source vertex
// has an attribute resetProb. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices { (id, attr) =>
if (!(id != src && personalized)) resetProb else 0.0
}

def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }

var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration rankGraph.cache()

// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)

// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
val rPrb = if (personalized) {
(src: VertexId , id: VertexId) => resetProb * delta(src, id)
} else {
(src: VertexId, id: VertexId) => resetProb
}

rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
logInfo(s"PageRank finished iteration $iteration.")
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)

iteration += 1
}

rankGraph
}

/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param tol the tolerance allowed at convergence (smaller => more accurate).
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
{
runUntilConvergenceWithOptions(graph, tol, resetProb)
}

/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param tol the tolerance allowed at convergence (smaller => more accurate).
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
val persOnalized= srcId.isDefined
val src: VertexId = srcId.getOrElse(-1L)

// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[(Double, Double), Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) {
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices { (id, attr) =>
if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
}
.cache()

// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}

def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
var teleport = oldPR
val delta = if (src==id) 1.0 else 0.0
teleport = oldPR*delta

val newPR = teleport + (1.0 - resetProb) * msgSum
val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
(newPR, newDelta)
}

def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
Iterator.empty
}
}

def messageCombiner(a: Double, b: Double): Double = a + b

// The initial message received by all vertices in PageRank
val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)

// Execute a dynamic version of Pregel.
val vp = if (personalized) {
(id: VertexId, attr: (Double, Double), msgSum: Double) =>
personalizedVertexProgram(id, attr, msgSum)
} else {
(id: VertexId, attr: (Double, Double), msgSum: Double) =>
vertexProgram(id, attr, msgSum)
}

Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vp, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank


好像篇幅写的有点长,接下来的我会在下一篇博客中更新,请大家随时关注!



推荐阅读
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
author-avatar
唐进水566673
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有