https://github.com/bartosz25/spark-...rcode/graphx/edge/EdgeRddTest.scala
After last week's discovery of VertexRDD we have still one graph-composing item to explain - EdgeRDD. After all, the graph is about the relationships this RDD guarantees the links between vertices.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
This post describes the edge representation in Apache Spark GraphX. The first section presents the transformations we can apply to it. The second one focuses on their directional character. The third one concentrates on the internal storage inside a partition. The post won't present the partitioning aspect because it's a little bit more complex than the partitioning of vertices and hence merits its own full blog post./p>
EdgeRDD
EdgeRDD is the children class of RDD[Edge[ED]] where ED is the type for edge attributes. But it's not the single component of edges. We can find all of them in the org.apache.spark.graphx.Edge class. Through it we can see that the edge is composed of 3 parts: the id of source vertex, the attributes of the edge and the id of the target vertex.
Since the edge representation is an RDD, it shares a lot of common methods, as filter, map or mapPartitions, with it:
it should "filter and map edges" in { val edges = TestSparkContext.parallelize(Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C"))) val graph = Graph.fromEdges(edges, "...") val edgesNotStartingFromVertex1 = graph.edges.filter(edge => edge.srcId != 1) .mapPartitions(edges => edges.map(edge => s"${edge.srcId}=${edge.attr}")) val collectedEdges = edgesNotStartingFromVertex1.collect() collectedEdges should have size 1 collectedEdges(0) shouldEqual "2=C" }
EdgeRDD has also more graph-specific transformations, for instance to reverse the direction of the relationship or to make a join with another edges dataset:
it should "reverse the edges direction" in { val edges = TestSparkContext.parallelize(Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C"))) val graph = Graph.fromEdges(edges, "...") val reversedEdges = graph.edges.reverse val collectedEdges = reversedEdges.map(edge => s"${edge.srcId}-->${edge.dstId}").collect() collectedEdges should have size 3 collectedEdges should contain allOf("2-->1", "3-->1", "4-->2") } it should "join 2 edge datasets" in { val sharedEdges = Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C")) val edgesGraph1 = TestSparkContext.parallelize(sharedEdges ++ Seq(Edge(2L, 4L, "Cbis"))) val graph1 = Graph.fromEdges(edgesGraph1, "...").partitionBy(PartitionStrategy.EdgePartition1D) val edgesGraph2 = TestSparkContext.parallelize(Seq(Edge(2L, 4L, "E")) ++ sharedEdges ++ Seq(Edge(1L, 5L, "D"), Edge(2L, 4L, "F"))) val graph2 = Graph.fromEdges(edgesGraph2, "...").partitionBy(PartitionStrategy.EdgePartition1D) val joinedEdges = graph1.edges.innerJoin(graph2.edges) { case (srcVertexId, dstVertexId, graph1Attr, graph2Attr) => s"${graph1Attr}${graph2Attr}" } val collectedEdges = joinedEdges.collect() collectedEdges should have size 4 // As you can see, the join takes the first declared edge in the underlying arrays. // It's why we retrieve all occurrences of (2, 4) from graph1 and only the firt one from the graph2 collectedEdges should contain allOf(Edge(1L, 2L, "AA"), Edge(1L, 3L, "BB"), Edge(2L, 4L, "CE"), Edge(2,4, "CbisE")) }
The second feature merits some words of the explanation. It assumes that both datasets are partitioned the same way. If it's not the case, the inner joining won't work as expected. We can notice this in the following test case:
it should "not join differently partitioned edges" in { val sharedEdges = Seq(Edge(1L, 2L, "A"), Edge(1L, 3L, "B"), Edge(2L, 4L, "C")) val edgesGraph1 = TestSparkContext.parallelize(sharedEdges) val graph1 = Graph.fromEdges(edgesGraph1, "...").partitionBy(PartitionStrategy.EdgePartition1D) val edgesGraph2 = TestSparkContext.parallelize(sharedEdges) val graph2 = Graph.fromEdges(edgesGraph2, "...").partitionBy(PartitionStrategy.EdgePartition2D) val joinedEdges = graph1.edges.innerJoin(graph2.edges) { case (srcVertexId, dstVertexId, graph1Attr, graph2Attr) => s"${graph1Attr}${graph2Attr}" } val collectedEdges = joinedEdges.collect() collectedEdges shouldBe empty }
Direction
The relationship between the 2 vertices in Edge case class is directed and the direction is resolved at the EdgeRDD's construction time. But it's purely conventional - the engine doesn't store any information about the edge direction. The direction is simply resolved from the declaration order of vertices ids in Edge case class.
If you remember the first post from GraphX series about graphs representation in Apache Spark GraphX, we have tested Graph's collectEdge method. Just to recall, it returns all vertices with their closest relationships that can be: incoming, outgoing or both. It doesn't mean that the direction is stored somewhere else though. The collection of the neighbors consists on iterating over all edges in given edge partition, extracting each edge with attributes and sending it as a message either to source vertex, target vertex or both. You can see this happen in aggregateMesageEdgeScan method of EdgePartition class:
val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) var i = 0 while (i < size) { val localSrcId = localSrcIds(i) val srcId = local2global(localSrcId) val localDstId = localDstIds(i) val dstId = local2global(localDstId) val edgeIsActive = if (activeness == EdgeActiveness.Neither) true else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId) else if (activeness == EdgeActiveness.DstOnly) isActive(dstId) else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId) else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId) else throw new Exception("unreachable") if (edgeIsActive) { val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD] val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD] ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i)) sendMsg(ctx) }
Columnar structure
As told in the introduction, the edges are partitioned but the partitioning strategies will be covered in the next post from GraphX series. Here we can only explain how the edges are stored within partitions. The Scaladoc of org.apache.spark.graphx.impl.EdgePartition class gives a lot of information about that:
* The edges are stored in columnar format in `localSrcIds`, `localDstIds`, and `data`.
And the listed fields are 2 arrays: of ints (for 2 first fields) and of the edge attribute type (for data). We can see the use of this columnar format in different places. One of them is the inner join transformations where the engine operates on the indices of both, source and joined edges dataset, to check whether they're symmetrical:
val srcId = this.srcIds(i) val dstId = this.dstIds(i) // ... forward j to the index of the corresponding edge in `other`, and... while (j < other.size && other.srcIds(j) < srcId) { j += 1 } if (j < other.size && other.srcIds(j) == srcId) { while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 } if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) { // ... run `f` on the matching edge
Does it mean that the position of given vertex is identical, independently on the dataset ? Not really. But there are some determinism in the construction of these columnar data structures. We can see it in org.apache.spark.graphx.impl.EdgePartitionBuilder#toEdgePartition method where, before building arrays, the edges are sorted in lexicographic order:
val edgeArray = edges.trim().array new Sorter(Edge.edgeArraySortDataFormat[ED]) .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) object Edge { private[graphx] def lexicographicOrdering[ED] = new Ordering[Edge[ED]] { override def compare(a: Edge[ED], b: Edge[ED]): Int = { if (a.srcId == b.srcId) { if (a.dstId == b.dstId) 0 else if (a.dstId < b.dstId) -1 else 1 } else if (a.srcId < b.srcId) -1 else 1 } }
The edges are then sorted in ascending order by source vertex id and later by target vertex id. You can see how does it work in the following example:
"edges" should "be sorted in asc order" in { val testEdges: Array[Edge[Int]] = Array( Edge(3L, 2L, 1), Edge(1L, 4L, 1), Edge(1L, 2L, 1), Edge(5L, 4L, 6), Edge(5L, 2L, 1), Edge(5L, 2L, 5), Edge(5L, 2L, 2) ) val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int]) sortedEdges should contain inOrderElementsOf Seq(Edge(1L, 2L, 1), Edge(1L, 4L, 1), Edge(3L, 2L, 1), Edge(5L, 2L, 1), Edge(5L, 2L, 5), Edge(5L, 2L, 2), Edge(5L, 4L, 6)) }
EdgeRDD is another component of Graph object in GraphX. Aside from classical RDD features like map, filter or cache, it has more graph-specific transformations. As shown in the first section, we can reverse edges direction. But it doesn't mean that the Edge class stores the direction between vertices. Instead, GraphX uses implicit rules to figure out the relationship direction - it's the definition order of vertices that matters. The post didn't cover the partitioning strategies because, as a more complex topic, they merit their own analysis.