1. The Spark Graph description

GraphX is a Spark component designed to represent graphs and perform parallel computation of graphs. GraphX extends RDD by redefining the abstraction of a graph: a directed polygraph with attributes attached to each vertex and edge. To support graph computation, GraphX exposes a series of basic operators (e.g., mapVertices, mapEdges, and Subgraph) as well as optimized variations of the Pregel API. In addition, a growing number of graph algorithms and builders are included to simplify graph analysis tasks. GraphX has optimized the storage of graph vertex information and edge information, which greatly improves the performance of graph computing framework compared with the native RDD implementation, approaching or reaching the performance of GraphLab and other professional graph computing platforms. GraphX’s greatest contribution is to provide a stack of data solutions on top of Spark that can easily and efficiently complete a whole set of flow-through graph calculations.

Graph calculation mode:

Basic graph computation is based on BSP, or global synchronous parallelism, which divides computation into a series of super-step iterations. Vertically, it is a serial mode, while horizontally, it is a parallel mode. A barrier is set between every two supersteps, namely the overall synchronization point, and the next superstep can be started after all parallel calculations are completed.

Each superstep contains three parts: compute: each processor uses the message from the previous superstep and local data to perform local computation message delivery. When each processor is finished computing, it passes the message to the overall synchronization point of other processors associated with it: Used for overall synchronization to proceed to the next superstep after ensuring that all calculations and messaging have been completed

2. Look at an example

Figure description

## Vertex data
1."SFO"
2."ORD"
3."DFW"
# # boundary data
1.2.1800
2.3.800
3.1.1400
Copy the code

Calculate all vertices, all edges, all triplets, the number of vertices, the number of edges, and the number of vertices with distances greater than 1000, sort by the distance between the vertices, and output in descending order

Code implementation

package com.hoult.Streaming.work

import org.apache.spark.{SparkConf.SparkContext}
import org.apache.spark.graphx.{Edge.Graph.VertexId}
import org.apache.spark.rdd.RDD

object GraphDemo {
  def main(args: Array[String) :Unit = {
    / / initialization
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    // Initialize the data
    val vertexArray: Array[(Long.String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW"))
    val edgeArray: Array[Edge[Int]] = Array(
      Edge(1L, 2L, 1800),
      Edge(2L, 3L, 800),
      Edge(3L, 1L, 1400))// Construct vertexRDD and edgeRDD
    val vertexRDD: RDD[(VertexId.String)] = sc.makeRDD(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)

    / / structure diagram
    val graph: Graph[String.Int] = Graph(vertexRDD, edgeRDD)

    // All vertices
    graph.vertices.foreach(println)

    // All edges
    graph.edges.foreach(println)

    // All the triplets
    graph.triplets.foreach(println)

    // Count the vertices
    val vertexCnt = graph.vertices.count()
    println(Number of s" vertices:$vertexCnt")

    / / number of edges
    val edgeCnt = graph.edges.count()
    println(S "number:$edgeCnt")

    // The airport distance is greater than 1000
    graph.edges.filter(_.attr > 1000).foreach(println)

    // Sort by distance between all airports (descending order)
    graph.edges.sortBy(-_.attr).collect().foreach(println)
  }
}

Copy the code

The output

3. Some knowledge about graphs

Example is the demo level, the actual production environment, if you are using to will be a lot of complicated than this, but in general, a certain scene will only be used to it, must pay attention to the figure calculation cases, attention should be paid to the cached data, RDD default is not stored in the memory, so you can use display cache, as far as possible in the iterative calculation, in order to get the best performance, You may also need to de-cache. By default, cached RDD and graphs are kept in memory until memory pressures force them to be gradually removed from memory according to LRU (Least Recently Used Page Swapping algorithm). For iterative calculations, the previous intermediate results fill memory. They are eventually removed from memory, but storing unnecessary data in memory slows down garbage collection. Therefore, it is more efficient to uncache intermediate results once they are no longer needed. This involves implementing a cache graph or RDD in each iteration, de-caching all other data sets, and using only the implemented data sets in subsequent iterations. However, because diagrams are composed of multiple RDDS, it is difficult to unpersist them correctly. For iterative calculations, it is recommended to use the Pregel API, which correctly preserves intermediate results. Check your profile for more.