# Motifs finding in GraphFrames

on waitingforcode.com

## Motifs finding in GraphFrames

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
In the previous post in GraphFrames category I mentioned the motifs finding feature and promised to write a separate post about it. After several weeks dedicated to the Apache Spark 2.4.0 features, I finally managed to find some time to explore the motifs finding in GraphFrames.

The post starts with a section explaining motifs finding in graphs. The next part shows how to use this feature in the GraphFrames. In the last section, you will find the execution details.

## Graph motifs finding

Motifs finding is also known as graph pattern matching. If you've already done some Scala, you have certainly met the pattern matching which consists on checking a value against some pattern. Applied to the graph, the pattern matching does almost the same, i.e. it finds some pattern inside the graph. The pattern is an expression used to define some connected vertices. For instance, if you apply the (person)-[works]->(Paris) on the following graph, you should get back the matches marked in red:

In our social media era, you can use motifs finding to suggest people to follow or friend candidates. The matching expression for that social use case can be defined as (person1)-[follows]->(person2); (person2)-[follows]->(person3); !(person1)-[follows]->(person3) where the ! is used to negate the pattern.

## Motifs finding in GraphFrames

GraphFrames project offers the motifs finding feature through find(pattern: String) method. As you can see, it takes a pattern to match in the parameter. The pattern is later parsed to one of the following Pattern implementations:

• Negation - represents a negated expression, like for instance:
```  private def testGraph: GraphFrame = {
val followerRelationship = "follows"
val people = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user4"), (5L, "user5")).toDF("id", "name")
val relationships = Seq((1L, 2L, followerRelationship), (1L, 3L, followerRelationship), (2L, 3L, followerRelationship),
(3L, 4L, followerRelationship), (4L, 3L, followerRelationship)).toDF("src", "dst", "label")
// graph: (user1)---[follows]-->(user2)---[follows]--->(user3)<---[follows]--->(user4)
//                 |                                                                   ^
//                  -----------------------------[follows]------------|
GraphFrame(people, relationships)
}

"negated pattern" should "be used to find every user who doesn't follow user3" in {
val graph = testGraph

val matchesWithoutNegation = graph.find("(user_1)-[follows]->(user_2)")
.select(functions.concat(\$"user_1.name", functions.lit("--[follows]-->"), \$"user_2.name").as("match"))
.where(raw"""follows.label = "follows"""")
val matchesWithNegation = graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)")
.select(functions.concat(\$"user_1.name", functions.lit("--[follows]-->"), \$"user_2.name").as("match"))
.where(raw"""follows.label = "follows"""")

val followersWithoutNegation = matchesWithoutNegation.collect().map(row => row.getAs[String]("match"))
followersWithoutNegation should have size 5
followersWithoutNegation should contain allOf("user1--[follows]-->user3", "user1--[follows]-->user2",
"user2--[follows]-->user3", "user3--[follows]-->user4", "user4--[follows]-->user3")
val followersWithNegation = matchesWithNegation.collect().map(row => row.getAs[String]("match"))
followersWithNegation should have size 3
followersWithNegation should contain allOf("user1--[follows]-->user3", "user1--[follows]-->user2", "user2--[follows]-->user3")
}
```
• AnonymousVertex - represents an anonymous vertex, i.e. the vertex those properties are ignored in the matching query. This vertex also won't be included as a column in the returned dataset.
• NamedVertex - represents a named vertex, i.e. vertex those properties may be involved in the matching expression
• AnonymousEdge - same as AnonymousVertex but applied to an edge
• NamedEdge - same as NamedVertex but applied to an edge

Below you can find an example for named and anonymous vertices together with named and anonymous edges:

```  "anonymous and named elements" should "be used to find all vertices with followers except user3" in {
val graph = testGraph

val matches = graph.find("()-[follows]->(user)")
.select("user.name")
.where(raw"""follows.label = "follows" AND user.name != "user3" """)

val followedUsers = matches.collect().map(row => row.getAs[String]("name"))
followedUsers should have size 2
followedUsers should contain allOf("user2", "user4")
}
```

## Motifs finding implementation

As you can see, the motifs finding is very powerful. You can chain the patterns with ";", negate them and also apply the native Apache Spark SQL operations. It's quite a lot for a feature that at first glance looks complex. It's then important to know what happens under-the-hood. I will use the last example of the previous section for the analysis. The graph.find("(user_1)-[follows]->(user_2)").explain(true) query returns the following execution plans:

```== Optimized Logical Plan ==
Project [user_1#120, follows#118, user_2#122]
+- Join Inner, (follows#118.dst = user_2#122.id)
:- Join Inner, (follows#118.src = user_1#120.id)
:  :- LocalRelation [follows#118]
:  +- LocalRelation [user_1#120]
+- LocalRelation [user_2#122]

== Physical Plan ==
*(1) Project [user_1#120, follows#118, user_2#122]
+- *(1) BroadcastHashJoin [follows#118.dst], [user_2#122.id], Inner, BuildRight
:- *(1) BroadcastHashJoin [follows#118.src], [user_1#120.id], Inner, BuildRight
:  :- LocalTableScan [follows#118]
:     +- LocalTableScan [user_1#120]
+- LocalTableScan [user_2#122]
```

As you can notice, under-the-hood the engine executes the motifs finding as multiple joins. It looks fine for the simple query but become much more complex for the second type of the query launched in the previous section which is the query with a negation. The graph.find("(user_1)-[follows]->(user_2); !(user_2)-[]->(user_1)").explain(true) gives the following execution plan:

```== Optimized Logical Plan ==
Aggregate [follows#22, user_1#24, user_2#26], [user_1#24, follows#22, user_2#26]
+- Join LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
:- Join Inner, (follows#22.dst = user_2#26.id)
:  :- Join Inner, (follows#22.src = user_1#24.id)
:  :  :- LocalRelation [follows#22]
:  :  +- LocalRelation [user_1#24]
:  +- LocalRelation [user_2#26]
+- Project [follows#68, user_1#71, user_2#73]
+- Join Inner, ((__tmp-4363599943432734077#47.src = user_2#73.id) && (__tmp-4363599943432734077#47.dst = user_1#71.id))
:- Join Inner, (follows#68.dst = user_2#73.id)
:  :- Join Inner, (follows#68.src = user_1#71.id)
:  :  :- LocalRelation [follows#68]
:  :  +- LocalRelation [user_1#71]
:  +- LocalRelation [user_2#73]
+- LocalRelation [__tmp-4363599943432734077#47]

== Physical Plan ==
*(6) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[user_1#24, follows#22, user_2#26])
+- Exchange hashpartitioning(follows#22, user_1#24, user_2#26, 200)
+- *(5) HashAggregate(keys=[follows#22, user_1#24, user_2#26], functions=[], output=[follows#22, user_1#24, user_2#26])
+- SortMergeJoin [coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,])], [coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,])], LeftAnti, (((follows#22 <=> follows#68) && (user_1#24 <=> user_1#71)) && (user_2#26 <=> user_2#73))
:- *(2) Sort [coalesce(follows#22, [0,0,]) ASC NULLS FIRST, coalesce(user_1#24, [0,]) ASC NULLS FIRST, coalesce(user_2#26, [0,]) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(coalesce(follows#22, [0,0,]), coalesce(user_1#24, [0,]), coalesce(user_2#26, [0,]), 200)
:     +- *(1) BroadcastHashJoin [follows#22.dst], [user_2#26.id], Inner, BuildRight
:        :- *(1) BroadcastHashJoin [follows#22.src], [user_1#24.id], Inner, BuildRight
:        :  :- LocalTableScan [follows#22]
:        :     +- LocalTableScan [user_1#24]
:           +- LocalTableScan [user_2#26]
+- *(4) Sort [coalesce(follows#68, [0,0,]) ASC NULLS FIRST, coalesce(user_1#71, [0,]) ASC NULLS FIRST, coalesce(user_2#73, [0,]) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(follows#68, [0,0,]), coalesce(user_1#71, [0,]), coalesce(user_2#73, [0,]), 200)
+- *(3) Project [follows#68, user_1#71, user_2#73]
+- *(3) BroadcastHashJoin [user_2#73.id, user_1#71.id], [__tmp-4363599943432734077#47.src, __tmp-4363599943432734077#47.dst], Inner, BuildRight
:- *(3) BroadcastHashJoin [follows#68.dst], [user_2#73.id], Inner, BuildRight
:  :- *(3) BroadcastHashJoin [follows#68.src], [user_1#71.id], Inner, BuildRight
:  :  :- LocalTableScan [follows#68]