Introduction to Apache Spark GraphX

on waitingforcode.com

Introduction to Apache Spark GraphX

Every time when we learn a new topic, it's important to start from the basics. We couldn't learn a new language without knowing the order of subject and verbs in a sentence. The same rule applies to Apache Spark's GraphX module that will be covered in this category. But before going into details, we'll focus on its basics.

Two sections compose this post. The first one summarizes main points about GraphX. The second part mentions shortly an incoming alternative project called GraphFrames.

Generally about GraphX

GraphX is the Apache Spark's library responsible for graph data processing. As it was the case previously for DStream-based streaming, it introduces a new data representation abstraction on top of RDD called...Graph. This structure is composed of 2 other on-top-of-RDD abstractions: VertexRDD and EdgeRDD. It's not difficult to deduce then that GraphX benefits from all RDD important properties as: immutability, distributed character, fault tolerance and caching ability.

Naturally, GraphX comes with RDD-based computation engine. It offers a lot of transformations we know from there, such as map, filter or group by. Aside from that, GraphX proposes more graph-specific operations, such as subgraph creation or neighbors collection. The computation implements Pregel approach where vertices compute their values on stages called supersteps. In the implementation, GraphX allows the communication only between vertices and their neighbors.

The graph stored during the computation is a directed multigraph. Each edge and vertex can have one or more assigned properties that may be later used in the computation. The following example should give more context about similarities with RDD-based processing:

"simple relationship-based code" should "show basic GraphX features" in {
  val sparkContext = SparkContext.getOrCreate(new SparkConf().setAppName("GraphX test").setMaster("local"))
  val parentOfRelationship = "is parent of"

  val people: RDD[(VertexId, String)] =
    sparkContext.parallelize(Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user4"), (5L, "user5")))
  val relationships: RDD[Edge[String]] =
    sparkContext.parallelize(Seq(Edge(1L, 2L, parentOfRelationship), Edge(1L, 3L, parentOfRelationship), Edge(2L, 3L, parentOfRelationship),
      Edge(3L, 4L, parentOfRelationship)))
  val graph = Graph(people, relationships)

  val groupedParents = graph.edges.filter(edge => edge.attr == parentOfRelationship)
      .groupBy(edge => edge.srcId).collect()

  val mappedVertices = groupedParents.map(vertexWithEdges => {
    val edges = vertexWithEdges._2
    val destinationVertices = edges.map(edge => edge.dstId)
    (vertexWithEdges._1, destinationVertices)
  }).toMap

  mappedVertices should have size 3
  mappedVertices(1) should have size 2
  mappedVertices(1) should contain allOf(2L, 3L)
  mappedVertices(2) should have size 1
  mappedVertices(2) should contain only 3L
  mappedVertices(3) should have size 1
  mappedVertices(3) should contain only 4L
}

GraphFrames alternative

Already mentioned DStream-based streaming was replaced by optimized Structured Streaming library. The same destiny seems to share GraphX - but as of this writing, it's not official. The project assumed to replace GraphX is GraphFrames and as we can easily deduce it from the name, it uses DataFrame abstraction to represents graph data.

From bird's eye view, GraphFrames is very similar to GraphX. Even the official documentation tells that “if you are familiar with GraphX, then GraphFrames will be easy to learn". Globally the operations expressed with GraphX's RDD abstraction are defined with DataFrames abstraction. However, all features of GraphX aren't implemented yet in GraphFrames and it's main the reason for keeping the project outside of the Apache Spark's core.

Just to give you a better idea of above sayings, let's rewrite the code from the previous section with GraphFrames:

"simple relationship-based code" should "be written with GraphFrames" in {
  val sparkSession = SparkSession.builder().appName("GraphFrames example").master("local").getOrCreate()
  val parentOfRelationship = "is parent of"
  import sparkSession.implicits._
  val people = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user4"), (5L, "user5")).toDF("id", "name")
  val relationships = Seq((1L, 2L, parentOfRelationship), (1L, 3L, parentOfRelationship), (2L, 3L, parentOfRelationship),
    (3L, 4L, parentOfRelationship)).toDF("src", "dst", "label")
  val graph = GraphFrame(people, relationships)

  val filteredGraph = graph.filterEdges(s"label = '${parentOfRelationship}'")
  val edgesGroupedBySource = filteredGraph.edges.collect().groupBy(row => row.getAs[Long]("src"))

  val mappedVertices = edgesGroupedBySource.map(vertexWithEdges => {
    val edges = vertexWithEdges._2
    val destinationVertices = edges.map(edge => edge.getAs[Long]("dst"))
    (vertexWithEdges._1, destinationVertices)
  })

  mappedVertices should have size 3
  mappedVertices(1) should have size 2
  mappedVertices(1) should contain allOf(2L, 3L)
  mappedVertices(2) should have size 1
  mappedVertices(2) should contain only 3L
  mappedVertices(3) should have size 1
  mappedVertices(3) should contain only 4L
}

This short article introduces some basic points about graph data processing in Apache Spark with GraphX library. The first part presented shortly RDD-based data abstraction and gave a small example of the code. The second section introduced GraphFrames - an alternative project that some day should be integrated to Apache Spark's core as GraphX replacement.

Read also about Introduction to Apache Spark GraphX here: GraphX Programming Guide .

Share, like or comment this post on Twitter:

Share on: