Stack operation in Apache Spark SQL

Versions: Apache Spark 3.0.1

Pivot operation presented 2 weeks ago transforms some cells into columns. The reverse one is called stack and it's time to see how it works!

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

The structure of this blog post is quite similar to the previous one. In the first section you will see an example of the STACK operation whereas in the next one, some execution details.

STACK example

Let's imagine that the PIVOT operation generated the following dataset:

|id |team1|team2|team3|
|1  |30   |300  |3000 |
|2  |50   |500  |5000 |
|3  |100  |1000 |10000|
|4  |200  |2000 |20000|

But our target is the table like:

| id|     team|points|
|  1|team1_new|    30|
|  1|team2_new|   300|
|  1|team3_new|  3000|
|  2|team1_new|    50|
|  2|team2_new|   500|
|  2|team3_new|  5000|
|  3|team1_new|   100|
|  3|team2_new|  1000|
|  3|team3_new| 10000|
|  4|team1_new|   200|
|  4|team2_new|  2000|
|  4|team3_new| 20000|

The code that can transform the source dataset into the target one quite seamlessly is called STACK and it takes a parameters: the number of rows to create from the "list" of (column, alias) tuples passed as a second parameter:

SELECT id, STACK(3, 'team1_new', team1, 'team2_new', team2, 'team3_new', team3) AS (team, points) FROM pivoted_table

Analysis stage

If you're looking for the STACK operation in the source code, you should find it pretty quickly because it's represented by the ... Stack class. It extends the Generator interface and thanks to that fact the categorization of the stack should be easy. In the Scaladoc of the Generator you can read that:

An expression that produces zero or more rows given a single input row.
Generators produce multiple output rows instead of a single value like other expressions, and thus they must have a schema to associate with the rows that are output.

Pretty clear, right? And it makes sense if we apply this description to the Stack node. If you look at our example, you will see that one row, initially composed of (id, team1, team2, team3) columns, was split into 3 rows. As stated in the quoted comment, an important element of this operation is the output schema, exposed from there:

case class Stack(children: Seq[Expression]) extends Generator {
// ...
  override def elementSchema: StructType =
    StructType(children.tail.take(numFields) {
      case (e, index) => StructField(s"col$index", e.dataType)

Stack generator uses this schema to verify, at the analysis stage, whether all the "stacked" columns do really exist in the data source. This verification logic is implemented in checkInputDataTypes() method. In addition to the basic checks like the number of parameters, the verification checks whether the data types from the output schema ((team, points) in our query) are the same as the data types from the "stacked" columns (eg. 'team1_new', team1 in our query). If any of them is incompatible, an AnalysisException is thrown:

      for (i <- 1 until children.length) {
        val j = (i - 1) % numFields
        if (children(i).dataType != elementSchema.fields(j).dataType) {
          return TypeCheckResult.TypeCheckFailure(
            s"Argument ${j + 1} (${elementSchema.fields(j).dataType.catalogString}) != " +
              s"Argument $i (${children(i).dataType.catalogString})")

Physical execution

The analyzed and optimized plans for our code look like that:

== Analyzed Logical Plan ==
id: int, team: string, points: int
Project [id#13, team#38, points#39]
+- Generate stack(3, team1_new, team1#14, team2_new, team2#15, team3_new, team3#16), false, [team#38, points#39]
   +- SubqueryAlias pivoted_table
      +- Project [_1#4 AS id#13, _2#5 AS team1#14, _3#6 AS team2#15, _4#7 AS team3#16]
         +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
Generate stack(3, team1_new, team1#14, team2_new, team2#15, team3_new, team3#16), [1, 2, 3], false, [team#38, points#39]
+- LocalRelation [id#13, team1#14, team2#15, team3#16]

As you can observe, the stack function is called directly, without any prior aggregation and shuffle like it was the case for pivot operation. The generation of multiple rows from the "stacked" row is made in eval(input: InternalRow) of the Stack class that, for given input ("stacked") row, will iterate numRows (3 in our query) times. For every iteration, the eval function will set the values for the columns specified in the alias after the stack operation ((team, points) in our query). At the end, it will use these values to generate a new row that will be added to the output dataset:

    val values =
    for (row <- 0 until numRows) yield {
      val fields = new Array[Any](numFields)
      for (col <- 0 until numFields) {
        val index = row * numFields + col
        fields.update(col, if (index < values.length) values(index) else null)
      InternalRow(fields: _*)

I mentioned the shuffle in the previous paragraph on purpose. If you remember, the pivot operation works on top of RelationalGroupedDataset and, naively, we could think that the reverse operation will do the opposite, so go back to the initial schema by another conversion to RelationalGroupedDataset. But as you saw in this article, it's much simpler like that since the operation can remain local to the partition.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!