Motifs finding in GraphFrames

In the previous post in GraphFrames category I mentioned the motifs finding feature and promised to write a separate post about it. After several weeks dedicated to the Apache Spark 2.4.0 features, I finally managed to find some time to explore the motifs finding in GraphFrames.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

The post starts with a section explaining motifs finding in graphs. The next part shows how to use this feature in the GraphFrames. In the last section, you will find the execution details.

Graph motifs finding

Motifs finding is also known as graph pattern matching. If you've already done some Scala, you have certainly met the pattern matching which consists on checking a value against some pattern. Applied to the graph, the pattern matching does almost the same, i.e. it finds some pattern inside the graph. The pattern is an expression used to define some connected vertices. For instance, if you apply the (person)-[works]->(Paris) on the following graph, you should get back the matches marked in red:

In our social media era, you can use motifs finding to suggest people to follow or friend candidates. The matching expression for that social use case can be defined as (person1)-[follows]->(person2); (person2)-[follows]->(person3); !(person1)-[follows]->(person3) where the ! is used to negate the pattern.

Motifs finding in GraphFrames

GraphFrames project offers the motifs finding feature through find(pattern: String) method. As you can see, it takes a pattern to match in the parameter. The pattern is later parsed to one of the following Pattern implementations:

Below you can find an example for named and anonymous vertices together with named and anonymous edges:

  "anonymous and named elements" should "be used to find all vertices with followers except user3" in {
    val graph = testGraph

    val matches = graph.find("()-[follows]->(user)")
      .select("user.name")
      .where(raw"""follows.label = "follows" AND user.name != "user3" """)

    val followedUsers = matches.collect().map(row => row.getAs[String]("name"))
    followedUsers should have size 2
    followedUsers should contain allOf("user2", "user4")
  }

Motifs finding implementation

As you can see, the motifs finding is very powerful. You can chain the patterns with ";", negate them and also apply the native Apache Spark SQL operations. It's quite a lot for a feature that at first glance looks complex. It's then important to know what happens under-the-hood. I will use the last example of the previous section for the analysis. The graph.find("(user_1)-[follows]->(user_2)").explain(true) query returns the following execution plans:

== Optimized Logical Plan ==
Project [user_1#120, follows#118, user_2#122]
+- Join Inner, (follows#118.dst = user_2#122.id)
   :- Join Inner, (follows#118.src = user_1#120.id)
   :  :- LocalRelation [follows#118]
   :  +- LocalRelation [user_1#120]
   +- LocalRelation [user_2#122]

== Physical Plan ==
*(1) Project [user_1#120, follows#118, user_2#122]
+- *(1) BroadcastHashJoin [follows#118.dst], [user_2#122.id], Inner, BuildRight
   :- *(1) BroadcastHashJoin [follows#118.src], [user_1#120.id], Inner, BuildRight
   :  :- LocalTableScan [follows#118]
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
   :     +- LocalTableScan [user_1#120]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
      +- LocalTableScan [user_2#122]

As you can notice, under-the-hood the engine executes the motifs finding as multiple joins. It looks fine for the simple query but become much more complex for the second type of the query launched in the previous section which is the query with a negation. The graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)").explain(true) gives the following execution plan:

== Optimized Logical Plan ==
Aggregate [follows#22, user_1#24, user_2#26], [user_1#24, follows#22, user_2#26]
+- Join LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
   :- Join Inner, (follows#22.dst = user_2#26.id)
   :  :- Join Inner, (follows#22.src = user_1#24.id)
   :  :  :- LocalRelation [follows#22]
   :  :  +- LocalRelation [user_1#24]
   :  +- LocalRelation [user_2#26]
   +- Project [follows#68, user_1#71, user_2#73]
      +- Join Inner, ((__tmp-4363599943432734077#47.src = user_2#73.id) && (__tmp-4363599943432734077#47.dst = user_1#71.id))
         :- Join Inner, (follows#68.dst = user_2#73.id)
         :  :- Join Inner, (follows#68.src = user_1#71.id)
         :  :  :- LocalRelation [follows#68]
         :  :  +- LocalRelation [user_1#71]
         :  +- LocalRelation [user_2#73]
         +- LocalRelation [__tmp-4363599943432734077#47]

== Physical Plan ==
*(6) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[user_1#24, follows#22, user_2#26])
+- Exchange hashpartitioning(follows#22, user_1#24, user_2#26, 200)
   +- *(5) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[follows#22, user_1#24, user_2#26])
      +- SortMergeJoin [coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,])], [coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,])], LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
         :- *(2) Sort [coalesce(follows#22, [0,0,]) ASC NULLS FIRST, coalesce(user_1#24, [0,]) ASC NULLS FIRST, coalesce(user_2#26, [0,]) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,]), 200)
         :     +- *(1) BroadcastHashJoin [follows#22.dst], [user_2#26.id], Inner, BuildRight
         :        :- *(1) BroadcastHashJoin [follows#22.src], [user_1#24.id], Inner, BuildRight
         :        :  :- LocalTableScan [follows#22]
         :        :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :        :     +- LocalTableScan [user_1#24]
         :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
         :           +- LocalTableScan [user_2#26]
         +- *(4) Sort [coalesce(follows#68, [0,0,]) ASC NULLS FIRST, coalesce(user_1#71, [0,]) ASC NULLS FIRST, coalesce(user_2#73, [0,]) ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,]), 200)
               +- *(3) Project [follows#68, user_1#71, user_2#73]
                  +- *(3) BroadcastHashJoin [user_2#73.id, user_1#71.id], [__tmp-4363599943432734077#47.src, __tmp-4363599943432734077#47.dst], Inner, BuildRight
                     :- *(3) BroadcastHashJoin [follows#68.dst], [user_2#73.id], Inner, BuildRight
                     :  :- *(3) BroadcastHashJoin [follows#68.src], [user_1#71.id], Inner, BuildRight
                     :  :  :- LocalTableScan [follows#68]
                     :  :  +- ReusedExchange [user_1#71], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     :  +- ReusedExchange [user_2#73], BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].id))
                     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct, false].src, input[0, struct, false].dst))
                        +- LocalTableScan [__tmp-4363599943432734077#47]

Here, the optimized logical plan for the not negated motif is the same as in the previous case. The negation itself is executed as a left anti-join (click to read more about Join type in Spark SQL) which returns all rows of the left dataset that are not present in the right one. Thus in both cases, the plan uses different joins and exchanges the data between executors. It can always have a negative impact on overall performance. In some months I will try to show the impact of such a query execution plan on a big volume of input data.

GraphFrames project uses a lot of Apache Spark SQL features. Ones of them are the joins which are the basic execution unit for the motifs finding feature described in this post. The motifs finding is very similar to the pattern matching because it tries to check whether some specific pattern exists in something else (the graph). As illustrated in the second and the third sections of the post, motifs finding applies to positive and negative matches. In both cases, GraphFrames executes them as the joins.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©