Apache Spark and window functions

on waitingforcode.com

Apache Spark and window functions

One of previous posts in SQL category presented window functions that can be used to compute values per grouped rows. These analytics functions are also available in Apache Spark SQL.

The post is organized in 3 parts. The first one lists all window functions available in Apache Spark 2.3.1 and shows their use through small code samples. The next one focuses on the execution plan of such queries by explaining 3 main components of physical execution. Finally, the last section digs a little bit deeper and presents some internal details of the computation.

Window functions list

Window functions were pretty clearly described in the post about window functions in SQL. Here we'll just recall that they're the functions applied on rows logically grouped in different window frames. In order to illustrate them in Apache Spark SQL we'll take an example of the last football World Cup (2018) and the list of the best scorers and assist makers for 4 countries (France, Russia, Belgium and England). The DataFrame used in the code snippets looks like:

import sparkSession.implicits._
private val WorldCupPlayers = Seq(
  Player("Harry Kane", "England", 6, 0),
  Player("Cristiano Ronaldo", "Portugal", 4, 1),
  Player("Antoine Griezmann", "France", 4, 2),
  Player("Romelu Lukaku", "Belgium", 4, 1),
  Player("Denis Cheryshev ", "Russia", 4, 0),
  Player("Kylian MBappe", "France", 4, 0),
  Player("Eden Hazard", "Belgium", 3, 2),
  Player("Artem Dzuyba", "Russia", 3, 2),
  Player("John Stones", "England", 2, 0),
  Player("Kevin De Bruyne", "Belgium", 1, 3),
  Player("Aleksandr Golovin", "Russia", 1, 2),
  Player("Paul Pogba", "France", 1, 0),
  Player("Pepe", "Portugal", 1, 0),
  Player("Ricardo Quaresma", "Portugal", 1, 0),
  Player("Dele Alli", "England", 1, 0)
).toDF()

To see the list of available window functions we can go throughout org.apache.spark.sql.functions and look for the methods annotated with @group window_funcs. It'll return us the list similar to this one:

  • lag(e: Column, offset: Int, defaultValue: Any), lead(e: Column, offset: Int) - two functions with opposite roles. Both create an extra column with the value specified as the first argument. To create this value lag function will take the offset row(s) before current row. lead function will do the same except it'll get the offset row(s) after the current one. Below two examples show this difference:
    it should "link to the next scorer in the team" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // it'll return a value x places after current row
      val theBestScorers = lead($"name", 1, "-").over(theBestScorersWindow)
    
      // for the simplicity, the case when several player have
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("next_scorer")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[String]("next_scorer")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, "Artem Dzuyba"), ("Artem Dzuyba", 3, "Aleksandr Golovin"), ("Aleksandr Golovin",1, "-"),
        ("Antoine Griezmann", 4, "Kylian MBappe"), ("Kylian MBappe", 4, "Paul Pogba"), ("Paul Pogba", 1, "-"),
        ("Romelu Lukaku", 4, "Eden Hazard"), ("Eden Hazard", 3, "Kevin De Bruyne"), ("Kevin De Bruyne", 1, "-"),
        ("Harry Kane", 6, "John Stones"), ("John Stones", 2, "Dele Alli"), ("Dele Alli", 1, "-"),
        ("Cristiano Ronaldo", 4, "Pepe"), ("Pepe", 1, "Ricardo Quaresma"), ("Ricardo Quaresma", 1, "-")
      )
    }
    
    it should "link to the previous scorer in the team" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // it'll return "lag" value x places before current row - it helps to build a sequence between rows
      val theBestScorers = lag($"name", 1, "-").over(theBestScorersWindow)
    
      // for the simplicity, the case when several player have
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("previous_scorer")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[String]("previous_scorer")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, "-"), ("Artem Dzuyba", 3, "Denis Cheryshev"), ("Aleksandr Golovin", 1, "Artem Dzuyba"),
        ("Antoine Griezmann", 4, "-"), ("Kylian MBappe", 4, "Antoine Griezmann"), ("Paul Pogba", 1, "Kylian MBappe"),
        ("Romelu Lukaku", 4, "-"), ("Eden Hazard", 3, "Romelu Lukaku"), ("Kevin De Bruyne", 1, "Eden Hazard"),
        ("Harry Kane", 6, "-"), ("John Stones", 2, "Harry Kane"), ("Dele Alli", 1, "John Stones"),
        ("Cristiano Ronaldo", 4, "-"), ("Pepe", 1, "Cristiano Ronaldo"), ("Ricardo Quaresma", 1, "Pepe")
      )
    }
    
  • rank(), dense_rank() - these methods generate a column with the rank number of given row within window frame. The difference is that unlike rank(), dense_rank() doesn't leave the holes. It means that for the values (5, 5, 2) rank() will generate (1, 1, 3) while dense_rank() (1, 1, 2). Two tests emphasize that difference:
    it should "rank goals per team" in {
      val goalsPerTeam = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // the difference with rank() is that dense_rank() keeps incremental number if two or more rows
      // are in the same position. For instance rank() will produce a result like (1, 2, 2, 4) while dense_rank() like
      // (1, 2, 2, 3)
      val goalsPerPlayerInTeam = rank().over(goalsPerTeam)
    
      val scorers = WorldCupPlayers.select($"*", goalsPerPlayerInTeam.as("scorer_position")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Int]("scorer_position")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 1), ("Artem Dzuyba", 3, 2), ("Aleksandr Golovin", 1, 3),
        ("Antoine Griezmann", 4, 1), ("Kylian MBappe", 4, 1), ("Paul Pogba", 1, 3),
        ("Romelu Lukaku", 4, 1), ("Eden Hazard", 3, 2), ("Kevin De Bruyne", 1, 3),
        ("Harry Kane", 6, 1), ("John Stones", 2, 2), ("Dele Alli", 1, 3),
        ("Cristiano Ronaldo", 4, 1), ("Pepe", 1, 2), ("Ricardo Quaresma", 1, 2)
      )
    }
    
    it should "dense rank goals per team" in {
      val goalsPerTeam = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // the difference with rank() is that dense_rank() keeps incremental number if two or more rows
      // are in the same position. For instance rank() will produce a result like (1, 2, 2, 4) while dense_rank() like
      // (1, 2, 2, 3)
      val goalsPerPlayerInTeam = dense_rank().over(goalsPerTeam)
    
      val scorers = WorldCupPlayers.select($"*", goalsPerPlayerInTeam.as("scorer_position")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Int]("scorer_position")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 1), ("Artem Dzuyba", 3, 2), ("Aleksandr Golovin", 1, 3),
        ("Antoine Griezmann", 4, 1), ("Kylian MBappe", 4, 1), ("Paul Pogba", 1, 2),
        ("Romelu Lukaku", 4, 1), ("Eden Hazard", 3, 2), ("Kevin De Bruyne", 1, 3),
        ("Harry Kane", 6, 1), ("John Stones", 2, 2), ("Dele Alli", 1, 3),
        ("Cristiano Ronaldo", 4, 1), ("Pepe", 1, 2), ("Ricardo Quaresma", 1, 2)
      )
    }
    
  • row_number() - it returns the number of each row within window partition:
    it should "return row number for each grouped player" in {
      val goalsPerTeam = Window.partitionBy($"team").orderBy($"goals".desc)
    
      val goalsPerPlayerInTeam = row_number().over(goalsPerTeam)
    
      val scorers = WorldCupPlayers.select($"*", goalsPerPlayerInTeam.as("nr")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Int]("nr")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 1), ("Artem Dzuyba", 3, 2), ("Aleksandr Golovin", 1, 3),
        ("Antoine Griezmann", 4, 1), ("Kylian MBappe", 4, 2), ("Paul Pogba", 1, 3),
        ("Romelu Lukaku", 4, 1), ("Eden Hazard", 3, 2), ("Kevin De Bruyne", 1, 3),
        ("Harry Kane", 6, 1), ("John Stones", 2, 2), ("Dele Alli", 1, 3),
        ("Cristiano Ronaldo", 4, 1), ("Pepe", 1, 2), ("Ricardo Quaresma", 1, 3)
      )
    }
    
  • ntile(n: Int) - this function puts partitioned rows into n different groups:
    it should "put players in 2 groups depending on scored goals" in {
      val theBestScorersWindow = Window.partitionBy($"goals").orderBy($"name".asc)
    
      val groups = 2
      val theBestScorers = ntile(groups).over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("group")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Int]("group")
      )).collect()
    
      println(s"scorers=${scorers.mkString(",")}")
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Aleksandr Golovin", 1, 1), ("Dele Alli", 1, 1), ("Kevin De Bruyne", 1, 1), ("Paul Pogba", 1, 2), ("Pepe", 1, 2),
        ("Ricardo Quaresma", 1, 2), ("Harry Kane", 6, 1), ("Artem Dzuyba", 3, 1), ("Eden Hazard", 3, 2),
        ("Antoine Griezmann", 4, 1), ("Cristiano Ronaldo", 4, 1), ("Denis Cheryshev", 4, 1), ("Kylian MBappe", 4, 2),
        ("Romelu Lukaku", 4, 2), ("John Stones", 2, 1)
      )
    }
    
  • aggregations as for instance avg(e: Column) - applies aggregation functions on window partitions (= aggregation applied on rows grouped by a key):
    it should "return average goals per team" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"team")
    
      val avgGoalsPerPlayerInTeam = avg($"goals").over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", avgGoalsPerPlayerInTeam.as("avg_goals")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("avg_goals")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 2.6666666666666665), ("Artem Dzuyba", 3, 2.6666666666666665),
        ("Aleksandr Golovin", 1, 2.6666666666666665),
        ("Antoine Griezmann", 4, 3.0), ("Kylian MBappe", 4, 3.0), ("Paul Pogba", 1, 3.0),
        ("Romelu Lukaku", 4, 2.6666666666666665), ("Eden Hazard", 3, 2.6666666666666665),
        ("Kevin De Bruyne", 1, 2.6666666666666665),
        ("Harry Kane", 6, 3.0), ("John Stones", 2, 3.0), ("Dele Alli", 1, 3.0),
        ("Cristiano Ronaldo", 4, 2.0), ("Pepe", 1, 2.0), ("Ricardo Quaresma", 1, 2.0)
      )
    }
    
  • cume_dist() - it's a cumulative distribution of given row within the partition, including current row. It's computed as cume_dist(x) = number of values before (and including) x / N where N is the total number of rows in the partition:
    it should "return the number of rows below each scorer" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // cume_dist() => returns the number of rows before current row (current row included)
      val theBestScorers = cume_dist().over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("players_below")).map(row => {
        (row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("players_below"))
      }).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 0.3333333333333333), ("Artem Dzuyba", 3, 0.6666666666666666), ("Aleksandr Golovin", 1, 1.0),
        ("Antoine Griezmann", 4, 0.6666666666666666), ("Kylian MBappe", 4, 0.6666666666666666), ("Paul Pogba", 1, 1.0),
        ("Romelu Lukaku", 4, 0.3333333333333333), ("Eden Hazard", 3, 0.6666666666666666), ("Kevin De Bruyne", 1, 1.0),
        ("Harry Kane", 6, 0.3333333333333333), ("John Stones", 2, 0.6666666666666666), ("Dele Alli", 1, 1.0),
        ("Cristiano Ronaldo", 4, 0.3333333333333333), ("Pepe", 1, 1.0), ("Ricardo Quaresma", 1, 1.0)
      )
    }
    
  • percent_rank() - computes relative (in %) rank for row within given window frame:
    it should "compute relative rank for scorer within window frame" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
    
      // percent_rank() => returns relative rank in %, i.e. how far given row is from the end of rows within
      // window partition
      val theBestScorers = percent_rank().over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("relative_position_from_the_best")).map(row => {
        (row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("relative_position_from_the_best"))
      }).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 0.0), ("Artem Dzuyba", 3, 0.5), ("Aleksandr Golovin", 1, 1.0),
        ("Antoine Griezmann", 4, 0.0), ("Kylian MBappe", 4, 0.0), ("Paul Pogba", 1, 1.0),
        ("Romelu Lukaku", 4, 0.0), ("Eden Hazard", 3, 0.5), ("Kevin De Bruyne", 1, 1.0),
        ("Harry Kane", 6, 0.0), ("John Stones", 2, 0.5), ("Dele Alli", 1, 1.0),
        ("Cristiano Ronaldo", 4, 0.0), ("Pepe", 1, 0.5), ("Ricardo Quaresma", 1, 0.5)
      )
    }
    
  • currentRow(), unboundedPreceding() and unboundedFollowing - all these methods can be used together in range frames to define flexible boundaries for the data to use per partition. For instance, using the 2 last boundaries would mean "take all rows within a partition and apply the defined aggregation". An example of that is presented below. Similarly, currentRow would limit the boundary to/from the current row, as shown in another test just here:
    it should "take whole frame with unbounded functions added in Apache Spark 2.3.0" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
        .rangeBetween(unboundedPreceding(), unboundedFollowing())
    
      val theBestScorers = count($"name").over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("all_players_in_team")).map(row => {
        (row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Long]("all_players_in_team"))
      }).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 3), ("Artem Dzuyba", 3, 3), ("Aleksandr Golovin", 1, 3),
        ("Antoine Griezmann", 4, 3), ("Kylian MBappe", 4, 3), ("Paul Pogba", 1, 3),
        ("Romelu Lukaku", 4, 3), ("Eden Hazard", 3, 3), ("Kevin De Bruyne", 1, 3),
        ("Harry Kane", 6, 3), ("John Stones", 2, 3), ("Dele Alli", 1, 3),
        ("Cristiano Ronaldo", 4, 3), ("Pepe", 1, 3), ("Ricardo Quaresma", 1, 3)
      )
    }
    
    it should "take a frame bounded by current row with new function added in Apache Spark 2.3.0" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
        .rangeBetween(currentRow(), unboundedFollowing())
    
      val theBestScorers = count($"name").over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("all_players_in_team")).map(row => {
        (row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Long]("all_players_in_team"))
      }).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 3), ("Artem Dzuyba", 3, 2), ("Aleksandr Golovin", 1, 1),
        ("Antoine Griezmann", 4, 3), ("Kylian MBappe", 4, 3), ("Paul Pogba", 1, 1),
        ("Romelu Lukaku", 4, 3), ("Eden Hazard", 3, 2), ("Kevin De Bruyne", 1, 1),
        ("Harry Kane", 6, 3), ("John Stones", 2, 2), ("Dele Alli", 1, 1),
        ("Cristiano Ronaldo", 4, 3), ("Pepe", 1, 2), ("Ricardo Quaresma", 1, 2)
      )
    }
    

As you can see throughout above examples, window functions are constructed with org.apache.spark.sql.expressions.Window object exposing all required factory functions: partitionBy(cols: Column*) and orderBy(cols: Column*). They build an instance of org.apache.spark.sql.expressions.WindowSpec that is later used in select expressions.

Execution plan

The execution plan for one of the presented examples looks like:

== Physical Plan ==
Window [lead(name#4, 1, -) windowspecdefinition(team#5, goals#6 DESC NULLS LAST, specifiedwindowframe(RowFrame, 1, 1)) AS next_scorer#13], [team#5], [goals#6 DESC NULLS LAST]
+- *(1) Sort [team#5 ASC NULLS FIRST, goals#6 DESC NULLS LAST], false, 0
   +- Exchange hashpartitioning(team#5, 200)
      +- LocalTableScan [name#4, team#5, goals#6, assists#7]

It gives 3 important insights. The first one is about the data behavior. Unsurprisingly window functions require a shuffle (Exchange hashpartitioning), here partitioned by team field. The second point is sorting. It's a required step to make the most of window functions working. Already these 2 properties show that executing window functions can be expensive in terms of computation time and resources. The last important concept to define is Window operator itself, displayed at the top of the plan. As you can see, it's composed of 3 elements:

  • org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition - defines window components and specifications. In our example we can retrieve the specifications for: partitioning (team#5), ordering (goals#6 DESC NULLS LAST) and window frame (specifiedwindowframe(RowFrame, 1, 1)).
  • org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame - it's the last fragment of previous point: specifiedwindowframe(RowFrame, 1, 1) . The first parameter defines the frame boundary that can be RowFrame or RangeFrame. The former one is described in the next point while the latter is presented here. A RangeFrame groups rows by logical offsets as one column's value. Since the data within a partition is ordered, we can easily take the rows that, regarding to currently read row, have range value x bigger and y smaller. For instance here we sum the scorers having up to 2 less goals scored or 1 more goal scored than the player in the current row:
    it should "apply custom range frame for sum goals" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".asc)
    
      val range = theBestScorersWindow.rangeBetween(-2, 1)
    
      val theBestScorers = sum($"goals").over(range)
    
      // for the simplicity, the case when several player have
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("range_sum")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Long]("range_sum")
      )).collect()
    
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Aleksandr Golovin", 1, 1), ("Artem Dzuyba", 3, 8), ("Denis Cheryshev", 4, 7),
        ("Paul Pogba", 1, 1), ("Antoine Griezmann", 4, 8), ("Kylian MBappe", 4, 8),
        ("Kevin De Bruyne", 1, 1), ("Eden Hazard", 3, 8), ("Romelu Lukaku", 4, 7),
        ("Dele Alli", 1, 3), ("John Stones", 2, 3), ("Harry Kane", 6, 6),
        ("Pepe", 1, 2), ("Ricardo Quaresma", 1, 2), ("Cristiano Ronaldo", 4, 4)
      )
    }
        
  • org.apache.spark.sql.catalyst.expressions.RowFrame - the frame from our example treats partition rows by row numbers and not column value. It means that with RowFrame we can take x rows before the current row and y rows after it. In the case of our analyzed plan we look always for 1 row succeeding the current row: (RowFrame, 1, 1). We could also use "0" to tell that only the current row interests us or a negative number to indicate x rows preceding the current one. Therefore row frame is based on physical offsets and not on logical ones.
    Some of functions introduce fixed frames, as for instance lead that is always represented as RowFrame of its defined offsets (e.g. (RowFrame, 2, 2) would be created for lead($"name", 2, "-")). In other cases this behavior can be configured with rowsBetween(start: Long, end: Long) method, especially for aggregations:
    it should "apply custom row frame for average aggregation" in {
      val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc).rowsBetween(-1, 1)
    
      val theBestScorers = avg($"goals").over(theBestScorersWindow)
    
      val scorers = WorldCupPlayers.select($"*", theBestScorers.as("rows_avg")).map(row => (
        row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("rows_avg")
      )).collect()
    
      // For the first row we'll never get the row before (-1). The same applies for the last row that will never
      // have an access to the next row (1). It's why the averages aren't the same for all players
      // It's a kind of sliding window according to physical offsets
      scorers should have size WorldCupPlayers.count()
      scorers should contain allOf(
        ("Denis Cheryshev", 4, 3.5), ("Artem Dzuyba", 3, 2.6666666666666665), ("Aleksandr Golovin", 1, 2.0),
        ("Antoine Griezmann", 4, 4.0), ("Kylian MBappe", 4, 3.0), ("Paul Pogba", 1, 2.5),
        ("Romelu Lukaku", 4, 3.5), ("Eden Hazard", 3, 2.6666666666666665), ("Kevin De Bruyne", 1, 2.0),
        ("Harry Kane", 6, 4.0), ("John Stones", 2, 3.0), ("Dele Alli", 1, 1.5),
        ("Cristiano Ronaldo", 4, 2.5), ("Pepe", 1, 2.0), ("Ricardo Quaresma", 1, 1.0)
      )
    }
    

Window internals

The class responsible for the window functions execution is WindowExec. It, and more exactly its doExecute() method gives some insight about windowed functions execution. The processing consists on applying org.apache.spark.rdd.RDD#mapPartitions([U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false)) on all partitioned buckets. The iterator returned by this method jumps from one partition group to another and for each item applies all of defined window frames:

val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
val frames = factories.map(_(windowFunctionResult))
val numFrames = frames.length

override final def next(): InternalRow = {
  // Load the next partition if we need to.
  if ((bufferIterator == null || !bufferIterator.hasNext) && nextRowAvailable) {
    fetchNextPartition()
  }
  // ...
  val current = bufferIterator.next()

    // Get the results for the window frames.
    var i = 0
    while (i < numFrames) {
      frames(i).write(rowIndex, current)
      i += 1
    }

    // 'Merge' the input row with the window function result
    join(current, windowFunctionResult)
    rowIndex += 1

    // Return the projection.
    result(join)
  // ...

The "how" to compute the frames is handled by windowFrameExpressionFactoryPairs returning a frame expression with corresponding factory method creating the computation. And for 2 previously described frame boundaries, we can adapt 5 frames:

  • offset frame - internally represented as OffsetWindowFunctionFrame. It's created for lead and lag statements.
  • entire partition frame - as the name suggests, this function applies for all entries of each partition. In consequence the computed result will be the same for every entry. It's represented as UnboundedWindowFunctionFrame. An example of such frame can look like:
      "entire partition frame" should "be created with unbounded preceeding and succeeding expressions" in {
        val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
          .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
        val theBestScorers = avg($"goals").over(theBestScorersWindow)
    
        val scorers = WorldCupPlayers.select($"*", theBestScorers.as("rows_avg")).map(row => (
          row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("rows_avg")
        )).collect()
    
        scorers should have size WorldCupPlayers.count()
        scorers should contain allOf(
          ("Denis Cheryshev", 4, 2.6666666666666665), ("Artem Dzuyba", 3, 2.6666666666666665), ("Aleksandr Golovin", 1, 2.6666666666666665),
          ("Antoine Griezmann", 4, 3.0), ("Kylian MBappe", 4, 3.0), ("Paul Pogba", 1, 3.0),
          ("Romelu Lukaku", 4, 2.6666666666666665), ("Eden Hazard", 3, 2.6666666666666665), ("Kevin De Bruyne", 1, 2.6666666666666665),
          ("Harry Kane", 6, 3.0), ("John Stones", 2, 3.0), ("Dele Alli", 1, 3.0),
          ("Cristiano Ronaldo", 4, 2.0), ("Pepe", 1, 2.0), ("Ricardo Quaresma", 1, 2.0)
        )
      }
    
  • growing frame - the name of this frame comes from the fact that at every iterated row we have 1 additional row in the processing. For instance if we take a partition with elements (A, B, C), the first row frame will contain only (A), the second one (A, B) and finally the last one all items (A, B, C). The implementation of this frame is UnboundedPrecedingWindowFunctionFrame, for example:
      "growing frame" should "be created with unbounded preceding and current row expression" in {
        val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
        val theBestScorers = avg($"goals").over(theBestScorersWindow)
    
        val scorers = WorldCupPlayers.select($"*", theBestScorers.as("rows_avg")).map(row => (
          row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("rows_avg")
        )).collect()
    
        
        scorers should have size WorldCupPlayers.count()
        scorers should contain allOf(
          ("Denis Cheryshev", 4, 4.0), ("Artem Dzuyba", 3, 3.5), ("Aleksandr Golovin", 1, 2.6666666666666665),
          ("Antoine Griezmann", 4, 4.0), ("Kylian MBappe", 4, 4.0), ("Paul Pogba", 1, 3.0),
          ("Romelu Lukaku", 4, 4.0), ("Eden Hazard", 3, 3.5), ("Kevin De Bruyne", 1, 2.6666666666666665),
          ("Harry Kane", 6, 6.0), ("John Stones", 2, 4.0), ("Dele Alli", 1, 3.0),
          ("Cristiano Ronaldo", 4, 4.0), ("Pepe", 1, 2.5), ("Ricardo Quaresma", 1, 2.0)
        )
      }
    
  • shrinking frame - it's the opposite of the previous frame. In fact every row contains one row less than the previous one. Once again, in our (A, B, C) example, the first frame will have (A, B, C), the second one only (A, B) and the third one (A). Concretely it's handled by UnboundedFollowingWindowFunctionFrame and an example looks like:
      "shrinking frame" should "be created with current row and unbounded following expression" in {
        val theBestScorersWindow = Window.partitionBy($"team").orderBy($"goals".desc)
          .rowsBetween(Window.currentRow, Window.unboundedFollowing)
    
        val theBestScorers = avg($"goals").over(theBestScorersWindow)
    
        val scorers = WorldCupPlayers.select($"*", theBestScorers.as("rows_avg")).map(row => (
          row.getAs[String]("name"), row.getAs[Int]("goals"), row.getAs[Double]("rows_avg")
        )).collect()
    
        println(s"scorers=${scorers.mkString(",")}")
        scorers should have size WorldCupPlayers.count()
        scorers should contain allOf(
          ("Denis Cheryshev", 4, 2.6666666666666665), ("Artem Dzuyba", 3, 2.0), ("Aleksandr Golovin", 1, 1.0),
          ("Antoine Griezmann", 4, 3.0), ("Kylian MBappe", 4, 2.5), ("Paul Pogba", 1, 1.0),
          ("Romelu Lukaku", 4, 2.6666666666666665), ("Eden Hazard", 3, 2.0), ("Kevin De Bruyne", 1, 1.0),
          ("Harry Kane", 6, 3.0), ("John Stones", 2, 1.5), ("Dele Alli", 1, 1.0),
          ("Cristiano Ronaldo", 4, 2.0), ("Pepe", 1, 1.0), ("Ricardo Quaresma", 1, 1.0)
        )
      }
    
  • moving frame - here for every row x previous and y next row(s) are used in the aggregation. For (A, B, C) we'll have the following frames: (A, B), (A, B, C) and (B, C). It's represented by SlidingWindowFunctionFrame and it's internally used by already presented rangeBetween and rowsBetween methods.

Apache Spark analytical window functions look similar to the aggregations applied on groups. As shown in the first section, these functions have a lot of common points with SQL-oriented ones. We retrieve among them lead, lag, rank, ntile and so forth. Even their physical execution is similar to the grouped aggregations. After all it starts by shuffling all rows with the same partitioning key to the same Apache Spark's partition. However, as shown in the 3rd section talking about internals, window analytical functions are little bit more flexible thanks to 5 different frames they provide.

Share, like or comment this post on Twitter:

Share on: