Loading and saving graphs in Apache Spark GraphX

Versions: Apache Spark GraphX 2.4.0 https://github.com/bartosz25/spark-...aitingforcode/graphx/loadingwriting

Until now we've been working only with in-memory graphs. However, Apache Spark GraphX provides a much more convenient and prod-ready methods to load and save them. And this post will try to show them.

The post is divided into 2 parts. The first one talks about the loading graphs aspect. The next one presents the opposite activity - persisting graphs.

Loading graphs

The easiest way to load graphs into GraphX consists in using the utility class called GraphLoader. It comes with a edgeListFile(sc: SparkContext, path: String, canonicalOrientation: Boolean, numEdgePartitions: Int, edgeStorageLevel: StorageLevel vertexStorageLevel: StorageLevel) method letting us to create a Graph instance from files. Each line of the file must contain either an edge ("sourceId destinationId") where vertices are separated by at least one whitespace, or a not-processable line beginning with a "#":

  private val EdgesFile = new File("/tmp/graphx-loading/edges.txt")

  before {
    val edges =
      """
        |1  3
        |1 2
        |2  5""".stripMargin
    FileUtils.writeStringToFile(EdgesFile, edges)
  }

  after {
    EdgesFile.delete()
  }

  "GraphLoader" should "load graph from a file" in {
    val testSparkContext = SparkContext.getOrCreate(new SparkConf().setAppName("GraphX graph loading")
      .setMaster("local[*]"))
    val graph = GraphLoader.edgeListFile(testSparkContext, EdgesFile.getPath)

    val vertices = graph.vertices.collect().map {
      case (vertexId, attrs) => s"(${vertexId})[${attrs}]"
    }
    val edges = graph.edges.collect().map(edge => s"(${edge.srcId})--[${edge.attr}]-->(${edge.dstId})")

    vertices should have size 4
    vertices should contain allOf("(2)[1]", "(1)[1]", "(3)[1]", "(5)[1]")
    edges should have size 3
    edges should contain allOf("(1)--[1]-->(2)", "(1)--[1]-->(3)", "(2)--[1]-->(5)")
  }

Under-the-hood the above method uses a RDD's text file data source. Therefore it creates the graph in distributed manner. However as you can see, it's not very flexible because we can't set the attributes neither for vertices nor for edges. Fortunately, the GraphX is based on low-level RDD API. Thanks to that we can enlarge the set of graph loading methods to even the Dataset-based data sources:

  "Dataset" should "be used to create a Graph from JSON" in {
    val sparkSession: SparkSession = SparkSession.builder().appName("Graph from JSON").master("local[*]")
      .getOrCreate()
    import sparkSession.implicits._
    val edgesDataset = Seq(
      (1, 2, "A"), (1, 3, "B"), (3, 4, "A")
    ).toDF("sourceId", "destinationId", "attributes")
    val verticesDataset = Seq(
      (1, "vertex1"), (2, "vertex2"), (3, "vertex3")
    ).toDF("vertexId", "attribute")

    edgesDataset.write.mode(SaveMode.Overwrite).json(EdgesDatasetFile.getAbsolutePath)
    verticesDataset.write.mode(SaveMode.Overwrite).json(VerticesDatasetFile.getAbsolutePath)

    val edgesFromDataset = sparkSession.read.json(EdgesDatasetFile.getAbsolutePath).mapPartitions(edgesRows => {
      edgesRows.map(edgeRow => {
        Edge(edgeRow.getAs[Long]("sourceId"), edgeRow.getAs[Long]("destinationId"), edgeRow.getAs[String]("attributes"))
      })
    }).rdd
    val verticesFromDataset = sparkSession.read.json(VerticesDatasetFile.getAbsolutePath).mapPartitions(vertices => {
      vertices.map(vertexRow => (vertexRow.getAs[Long]("vertexId"), vertexRow.getAs[String]("attribute")))
    }).rdd

    val graphFromEdges = Graph(verticesFromDataset, edgesFromDataset, "?")

    val vertices = graphFromEdges.vertices.collect().map {
      case (vertexId, attrs) => s"(${vertexId})[${attrs}]"
    }
    vertices should have size 4
    vertices should contain allOf("(1)[vertex1]", "(2)[vertex2]", "(3)[vertex3]", "(4)[?]")
    val edges = graphFromEdges.edges.collect().map(edge => s"(${edge.srcId})--[${edge.attr}]-->(${edge.dstId})")
    edges should have size 3
    edges should contain allOf("(1)--[A]-->(2)", "(3)--[A]-->(4)", "(1)--[B]-->(3)")
    sparkSession.close()
  }

The code above uses the same logic as the one implemented by GraphLoader. The difference is that it applies that logic on Dataset API. Thanks to that our graph can be stored in a different formats (CSV, JSON, text, RDBMS, ...) and hence be processed or visualized in many other places. For instance, we could store the graph in the JSON formats used by one of JavaScript-based graph visualization tools (Alchemy.js, 3D.js, ...) and share the same structure for processing and visualization. Moreover, Apache Spark SQL offers a wide range of data sources, including modern cloud services.

Graph as an output

For what purposes we could need a graph output ? First, we can consider our graph processing as a part of batch job making the data structure evolving at each execution, as for instance for daily executed PageRank updates. Also the input graph could generate two or more subgraphs. It could be useful in all kind of clustering algorithms as connected components. We could use the graph to find patterns and solve one of graph mining problems. Finally, the graph could be also processed to generate a Machine Learning model.

Where the graph could be stored ? Since GraphX uses RDD abstraction, graph parts (vertices, edges, triplets) can be persisted to the files either as text or as objects:

  "graph edges" should "be saved to an object file" in {
    val edges = TestSparkContext.parallelize(Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C")))

    val graph = Graph.fromEdges(edges, "...")

    graph.edges.saveAsObjectFile(TargetObjectFile.getAbsolutePath)

    val edgesFromFile = TestSparkContext.objectFile[Edge[String]](TargetObjectFile.getAbsolutePath)

    val collectedEdges = edgesFromFile.collect()
      .map(edge => s"${edge.srcId}-[${edge.attr}]->${edge.dstId}")
    collectedEdges should have size 3
    collectedEdges should contain allOf("1-[B]->3", "1-[A]->2", "2-[C]->4")
  }

As you can see, the above solution is very straightforward. All edges are saved with RDD's saveAsObjectFile method. And it works for saveAsTextFile too. However, a downside of that solution is the need to store vertices and edges in separate files. We could eliminate this problem with the writing of triplets instead of edges and vertices:

  "graph triplets" should "be saved to an object file" in {
    val edges = TestSparkContext.parallelize(Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C")))
    val vertices = TestSparkContext.parallelize(Seq((1L, "AA"), (2L, "BB"), (3L, "CC"), (4L, "DD")))

    val graph = Graph(vertices, edges)

    graph.triplets.saveAsTextFile(TargetTripletsDir.getAbsolutePath)

    val tripletsFromFile = TestSparkContext.textFile(TargetTripletsDir.getAbsolutePath)

    val collectedTriplets = tripletsFromFile.collect()
    collectedTriplets should have size 3
    collectedTriplets should contain allOf("((1,AA),(3,CC),B)", "((1,AA),(2,BB),A)", "((2,BB),(4,DD),C)")
  }

Unfortunately, the use of triplets has major a drawback. It repeats the attributes of the vertices in every edge. Maybe it doesn't look scary in the test case where the attribute is composed of 2 letters. However, for the case of more complex values, as for instance JSONs, it may become inefficient to duplicate the information at every relationship.

As for the loading graphs, Dataset abstraction can also help to persisting graphs. Its toDF() method converts an RDD into a Dataset and hence enables writing the graph to other formats like JSON, CSV, Parquet and so forth:

 "graph triplets" should "be saved to a JSON file" in {
    val sparkSession: SparkSession = SparkSession.builder().appName("Graph to JSON").master("local[*]")
      .getOrCreate()
    import sparkSession.implicits._
    val edges = TestSparkContext.parallelize(Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C")))
    val vertices = TestSparkContext.parallelize(Seq((1L, "AA"), (2L, "BB"), (3L, "CC"), (4L, "DD")))

    val graph = Graph(vertices, edges)
    val tripletsDataFrame = graph.triplets
      .map(triplet => (triplet.srcId, triplet.srcAttr, triplet.dstId, triplet.dstAttr, triplet.attr))
      .toDF("srcId", "srcAttr", "dstId", "dstAttr", "edgeAttr")
    tripletsDataFrame.write.json(TargetJsonDir.getAbsolutePath)

    val tripletsFromFileAsJson = TestSparkContext.textFile(TargetJsonDir.getAbsolutePath)

    val collectedTriplets = tripletsFromFileAsJson.collect()
    collectedTriplets should have size 3
    collectedTriplets should contain allOf("{\"srcId\":1,\"srcAttr\":\"AA\",\"dstId\":2,\"dstAttr\":\"BB\",\"edgeAttr\":\"A\"}",
      "{\"srcId\":2,\"srcAttr\":\"BB\",\"dstId\":4,\"dstAttr\":\"DD\",\"edgeAttr\":\"C\"}",
      "{\"srcId\":1,\"srcAttr\":\"AA\",\"dstId\":3,\"dstAttr\":\"CC\",\"edgeAttr\":\"B\"}")
  }

Thanks to the use of common RDD abstraction, loading and saving graphs in GraphX don't differ a lot from dealing with more standard data structures. As we could see in the first section, loading graph is natively supported from an edges file. However it comes with some shortcomings, as the inability to define edges and vertices attributes. Luckily, we can write a pretty similar mechanism with the use of other loading methods and create the graph instance directly from mapped VertexRDD and EdgeRDD. The same rules apply to the writing. Here too we can use native formats provided by RDD abstraction. But if we want, we can also extend it by the sinks provided by Dataset abstraction.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!