Motifs finding in GraphFrames

Versions: GraphFrames 0.6

In the previous post in GraphFrames category I mentioned the motifs finding feature and promised to write a separate post about it. After several weeks dedicated to the Apache Spark 2.4.0 features, I finally managed to find some time to explore the motifs finding in GraphFrames.

New ebook ๐Ÿ”ฅ

Learn 84 ways to solve common data engineering problems with cloud services.

๐Ÿ‘‰ I want my Early Access edition

The post starts with a section explaining motifs finding in graphs. The next part shows how to use this feature in the GraphFrames. In the last section, you will find the execution details.

Graph motifs finding

Motifs finding is also known as graph pattern matching. If you've already done some Scala, you have certainly met the pattern matching which consists on checking a value against some pattern. Applied to the graph, the pattern matching does almost the same, i.e. it finds some pattern inside the graph. The pattern is an expression used to define some connected vertices. For instance, if you apply the (person)-[works]->(Paris) on the following graph, you should get back the matches marked in red:

In our social media era, you can use motifs finding to suggest people to follow or friend candidates. The matching expression for that social use case can be defined as (person1)-[follows]->(person2); (person2)-[follows]->(person3); !(person1)-[follows]->(person3) where the ! is used to negate the pattern.

Motifs finding in GraphFrames

GraphFrames project offers the motifs finding feature through find(pattern: String) method. As you can see, it takes a pattern to match in the parameter. The pattern is later parsed to one of the following Pattern implementations:

Below you can find an example for named and anonymous vertices together with named and anonymous edges:

  "anonymous and named elements" should "be used to find all vertices with followers except user3" in {
    val graph = testGraph

    val matches = graph.find("()-[follows]->(user)")
      .select("user.name")
      .where(raw"""follows.label = "follows" AND user.name != "user3" """)

    val followedUsers = matches.collect().map(row => row.getAs[String]("name"))
    followedUsers should have size 2
    followedUsers should contain allOf("user2", "user4")
  }

Motifs finding implementation

As you can see, the motifs finding is very powerful. You can chain the patterns with ";", negate them and also apply the native Apache Spark SQL operations. It's quite a lot for a feature that at first glance looks complex. It's then important to know what happens under-the-hood. I will use the last example of the previous section for the analysis. The graph.find("(user_1)-[follows]->(user_2)").explain(true) query returns the following execution plans:

== Optimized Logical Plan ==
Project [user_1#120, follows#118, user_2#122]
+- Join Inner, (follows#118.dst = user_2#122.id)
   :- Join Inner, (follows#118.src = user_1#120.id)
   :  :- LocalRelation [follows#118]
   :  +- LocalRelation [user_1#120]
   +- LocalRelation [user_2#122]

== Physical Plan ==
*(1) Project [user_1#120, follows#118, user_2#122]
+- *(1) BroadcastHashJoin [follows#118.dst], [user_2#122.id], Inner, BuildRight
   :- *(1) BroadcastHashJoin [follows#118.src], [user_1#120.id], Inner, BuildRight
   :  :- LocalTableScan [follows#118]
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
   :     +- LocalTableScan [user_1#120]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
      +- LocalTableScan [user_2#122]

As you can notice, under-the-hood the engine executes the motifs finding as multiple joins. It looks fine for the simple query but become much more complex for the second type of the query launched in the previous section which is the query with a negation. The graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)").explain(true) gives the following execution plan:

== Optimized Logical Plan ==
Aggregate [follows#22, user_1#24, user_2#26], [user_1#24, follows#22, user_2#26]
+- Join LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
   :- Join Inner, (follows#22.dst = user_2#26.id)
   :  :- Join Inner, (follows#22.src = user_1#24.id)
   :  :  :- LocalRelation [follows#22]
   :  :  +- LocalRelation [user_1#24]
   :  +- LocalRelation [user_2#26]
   +- Project [follows#68, user_1#71, user_2#73]
      +- Join Inner, ((__tmp-4363599943432734077#47.src = user_2#73.id) && (__tmp-4363599943432734077#47.dst = user_1#71.id))
         :- Join Inner, (follows#68.dst = user_2#73.id)
         :  :- Join Inner, (follows#68.src = user_1#71.id)
         :  :  :- LocalRelation [follows#68]
         :  :  +- LocalRelation [user_1#71]
         :  +- LocalRelation [user_2#73]
         +- LocalRelation [__tmp-4363599943432734077#47]

== Physical Plan ==
*(6) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[user_1#24, follows#22, user_2#26])
+- Exchange hashpartitioning(follows#22, user_1#24, user_2#26, 200)
   +- *(5) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[follows#22, user_1#24, user_2#26])
      +- SortMergeJoin [coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,])], [coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,])], LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
         :- *(2) Sort [coalesce(follows#22, [0,0,]) ASC NULLS FIRST, coalesce(user_1#24, [0,]) ASC NULLS FIRST, coalesce(user_2#26, [0,]) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,]), 200)
         :     +- *(1) BroadcastHashJoin [follows#22.dst], [user_2#26.id], Inner, BuildRight
         :        :- *(1) BroadcastHashJoin [follows#22.src], [user_1#24.id], Inner, BuildRight
         :        :  :- LocalTableScan [follows#22]
         :        :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :        :     +- LocalTableScan [user_1#24]
         :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :           +- LocalTableScan [user_2#26]
         +- *(4) Sort [coalesce(follows#68, [0,0,]) ASC NULLS FIRST, coalesce(user_1#71, [0,]) ASC NULLS FIRST, coalesce(user_2#73, [0,]) ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,]), 200)
               +- *(3) Project [follows#68, user_1#71, user_2#73]
                  +- *(3) BroadcastHashJoin [user_2#73.id, user_1#71.id], [__tmp-4363599943432734077#47.src, __tmp-4363599943432734077#47.dst], Inner, BuildRight
                     :- *(3) BroadcastHashJoin [follows#68.dst], [user_2#73.id], Inner, BuildRight
                     :  :- *(3) BroadcastHashJoin [follows#68.src], [user_1#71.id], Inner, BuildRight
                     :  :  :- LocalTableScan [follows#68]
                     :  :  +- ReusedExchange [user_1#71], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     :  +- ReusedExchange [user_2#73], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].src, input[0, struct, false].dst))
                        +- LocalTableScan [__tmp-4363599943432734077#47]

Here, the optimized logical plan for the not negated motif is the same as in the previous case. The negation itself is executed as a left anti-join (click to read more about Join type in Spark SQL) which returns all rows of the left dataset that are not present in the right one. Thus in both cases, the plan uses different joins and exchanges the data between executors. It can always have a negative impact on overall performance. In some months I will try to show the impact of such a query execution plan on a big volume of input data.

GraphFrames project uses a lot of Apache Spark SQL features. Ones of them are the joins which are the basic execution unit for the motifs finding feature described in this post. The motifs finding is very similar to the pattern matching because it tries to check whether some specific pattern exists in something else (the graph). As illustrated in the second and the third sections of the post, motifs finding applies to positive and negative matches. In both cases, GraphFrames executes them as the joins.