Motifs finding in GraphFrames

on waitingforcode.com

Motifs finding in GraphFrames

In the previous post in GraphFrames category I mentioned the motifs finding feature and promised to write a separate post about it. After several weeks dedicated to the Apache Spark 2.4.0 features, I finally managed to find some time to explore the motifs finding in GraphFrames.

The post starts with a section explaining motifs finding in graphs. The next part shows how to use this feature in the GraphFrames. In the last section, you will find the execution details.

Graph motifs finding

Motifs finding is also known as graph pattern matching. If you've already done some Scala, you have certainly met the pattern matching which consists on checking a value against some pattern. Applied to the graph, the pattern matching does almost the same, i.e. it finds some pattern inside the graph. The pattern is an expression used to define some connected vertices. For instance, if you apply the (person)-[works]->(Paris) on the following graph, you should get back the matches marked in red:

In our social media era, you can use motifs finding to suggest people to follow or friend candidates. The matching expression for that social use case can be defined as (person1)-[follows]->(person2); (person2)-[follows]->(person3); !(person1)-[follows]->(person3) where the ! is used to negate the pattern.

Motifs finding in GraphFrames

GraphFrames project offers the motifs finding feature through find(pattern: String) method. As you can see, it takes a pattern to match in the parameter. The pattern is later parsed to one of the following Pattern implementations:

  • Negation - represents a negated expression, like for instance:
      private def testGraph: GraphFrame = {
        val followerRelationship = "follows"
        val people = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user4"), (5L, "user5")).toDF("id", "name")
        val relationships = Seq((1L, 2L, followerRelationship), (1L, 3L, followerRelationship), (2L, 3L, followerRelationship),
          (3L, 4L, followerRelationship), (4L, 3L, followerRelationship)).toDF("src", "dst", "label")
        // graph: (user1)---[follows]-->(user2)---[follows]--->(user3)<---[follows]--->(user4)
        //                 |                                                                   ^
        //                  -----------------------------[follows]------------|
        GraphFrame(people, relationships)
      }
    
      "negated pattern" should "be used to find every user who doesn't follow user3" in {
        val graph = testGraph
    
        val matchesWithoutNegation = graph.find("(user_1)-[follows]->(user_2)")
          .select(functions.concat($"user_1.name", functions.lit("--[follows]-->"), $"user_2.name").as("match"))
          .where(raw"""follows.label = "follows"""")
        val matchesWithNegation = graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)")
          .select(functions.concat($"user_1.name", functions.lit("--[follows]-->"), $"user_2.name").as("match"))
          .where(raw"""follows.label = "follows"""")
    
        val followersWithoutNegation = matchesWithoutNegation.collect().map(row => row.getAs[String]("match"))
        followersWithoutNegation should have size 5
        followersWithoutNegation should contain allOf("user1--[follows]-->user3", "user1--[follows]-->user2",
          "user2--[follows]-->user3", "user3--[follows]-->user4", "user4--[follows]-->user3")
        val followersWithNegation = matchesWithNegation.collect().map(row => row.getAs[String]("match"))
        followersWithNegation should have size 3
        followersWithNegation should contain allOf("user1--[follows]-->user3", "user1--[follows]-->user2", "user2--[follows]-->user3")
      }
    
  • AnonymousVertex - represents an anonymous vertex, i.e. the vertex those properties are ignored in the matching query. This vertex also won't be included as a column in the returned dataset.
  • NamedVertex - represents a named vertex, i.e. vertex those properties may be involved in the matching expression
  • AnonymousEdge - same as AnonymousVertex but applied to an edge
  • NamedEdge - same as NamedVertex but applied to an edge

Below you can find an example for named and anonymous vertices together with named and anonymous edges:

  "anonymous and named elements" should "be used to find all vertices with followers except user3" in {
    val graph = testGraph

    val matches = graph.find("()-[follows]->(user)")
      .select("user.name")
      .where(raw"""follows.label = "follows" AND user.name != "user3" """)

    val followedUsers = matches.collect().map(row => row.getAs[String]("name"))
    followedUsers should have size 2
    followedUsers should contain allOf("user2", "user4")
  }

Motifs finding implementation

As you can see, the motifs finding is very powerful. You can chain the patterns with ";", negate them and also apply the native Apache Spark SQL operations. It's quite a lot for a feature that at first glance looks complex. It's then important to know what happens under-the-hood. I will use the last example of the previous section for the analysis. The graph.find("(user_1)-[follows]->(user_2)").explain(true) query returns the following execution plans:

== Optimized Logical Plan ==
Project [user_1#120, follows#118, user_2#122]
+- Join Inner, (follows#118.dst = user_2#122.id)
   :- Join Inner, (follows#118.src = user_1#120.id)
   :  :- LocalRelation [follows#118]
   :  +- LocalRelation [user_1#120]
   +- LocalRelation [user_2#122]

== Physical Plan ==
*(1) Project [user_1#120, follows#118, user_2#122]
+- *(1) BroadcastHashJoin [follows#118.dst], [user_2#122.id], Inner, BuildRight
   :- *(1) BroadcastHashJoin [follows#118.src], [user_1#120.id], Inner, BuildRight
   :  :- LocalTableScan [follows#118]
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
   :     +- LocalTableScan [user_1#120]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
      +- LocalTableScan [user_2#122]

As you can notice, under-the-hood the engine executes the motifs finding as multiple joins. It looks fine for the simple query but become much more complex for the second type of the query launched in the previous section which is the query with a negation. The graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)").explain(true) gives the following execution plan:

== Optimized Logical Plan ==
Aggregate [follows#22, user_1#24, user_2#26], [user_1#24, follows#22, user_2#26]
+- Join LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
   :- Join Inner, (follows#22.dst = user_2#26.id)
   :  :- Join Inner, (follows#22.src = user_1#24.id)
   :  :  :- LocalRelation [follows#22]
   :  :  +- LocalRelation [user_1#24]
   :  +- LocalRelation [user_2#26]
   +- Project [follows#68, user_1#71, user_2#73]
      +- Join Inner, ((__tmp-4363599943432734077#47.src = user_2#73.id) && (__tmp-4363599943432734077#47.dst = user_1#71.id))
         :- Join Inner, (follows#68.dst = user_2#73.id)
         :  :- Join Inner, (follows#68.src = user_1#71.id)
         :  :  :- LocalRelation [follows#68]
         :  :  +- LocalRelation [user_1#71]
         :  +- LocalRelation [user_2#73]
         +- LocalRelation [__tmp-4363599943432734077#47]

== Physical Plan ==
*(6) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[user_1#24, follows#22, user_2#26])
+- Exchange hashpartitioning(follows#22, user_1#24, user_2#26, 200)
   +- *(5) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[follows#22, user_1#24, user_2#26])
      +- SortMergeJoin [coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,])], [coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,])], LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
         :- *(2) Sort [coalesce(follows#22, [0,0,]) ASC NULLS FIRST, coalesce(user_1#24, [0,]) ASC NULLS FIRST, coalesce(user_2#26, [0,]) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,]), 200)
         :     +- *(1) BroadcastHashJoin [follows#22.dst], [user_2#26.id], Inner, BuildRight
         :        :- *(1) BroadcastHashJoin [follows#22.src], [user_1#24.id], Inner, BuildRight
         :        :  :- LocalTableScan [follows#22]
         :        :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :        :     +- LocalTableScan [user_1#24]
         :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :           +- LocalTableScan [user_2#26]
         +- *(4) Sort [coalesce(follows#68, [0,0,]) ASC NULLS FIRST, coalesce(user_1#71, [0,]) ASC NULLS FIRST, coalesce(user_2#73, [0,]) ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,]), 200)
               +- *(3) Project [follows#68, user_1#71, user_2#73]
                  +- *(3) BroadcastHashJoin [user_2#73.id, user_1#71.id], [__tmp-4363599943432734077#47.src, __tmp-4363599943432734077#47.dst], Inner, BuildRight
                     :- *(3) BroadcastHashJoin [follows#68.dst], [user_2#73.id], Inner, BuildRight
                     :  :- *(3) BroadcastHashJoin [follows#68.src], [user_1#71.id], Inner, BuildRight
                     :  :  :- LocalTableScan [follows#68]
                     :  :  +- ReusedExchange [user_1#71], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     :  +- ReusedExchange [user_2#73], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].src, input[0, struct, false].dst))
                        +- LocalTableScan [__tmp-4363599943432734077#47]

Here, the optimized logical plan for the not negated motif is the same as in the previous case. The negation itself is executed as a left anti-join (click to read more about Join type in Spark SQL) which returns all rows of the left dataset that are not present in the right one. Thus in both cases, the plan uses different joins and exchanges the data between executors. It can always have a negative impact on overall performance. In some months I will try to show the impact of such a query execution plan on a big volume of input data.

GraphFrames project uses a lot of Apache Spark SQL features. Ones of them are the joins which are the basic execution unit for the motifs finding feature described in this post. The motifs finding is very similar to the pattern matching because it tries to check whether some specific pattern exists in something else (the graph). As illustrated in the second and the third sections of the post, motifs finding applies to positive and negative matches. In both cases, GraphFrames executes them as the joins.

If you liked it, you should read: Creating graphs in GraphFrames

Share, like or comment this post on Twitter:

Share on: