Pivot operation presented 2 weeks ago transforms some cells into columns. The reverse one is called stack and it's time to see how it works!
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
The structure of this blog post is quite similar to the previous one. In the first section you will see an example of the STACK operation whereas in the next one, some execution details.
STACK example
Let's imagine that the PIVOT operation generated the following dataset:
+---+-----+-----+-----+ |id |team1|team2|team3| +---+-----+-----+-----+ |1 |30 |300 |3000 | |2 |50 |500 |5000 | |3 |100 |1000 |10000| |4 |200 |2000 |20000| +---+-----+-----+-----+
But our target is the table like:
+---+---------+------+ | id| team|points| +---+---------+------+ | 1|team1_new| 30| | 1|team2_new| 300| | 1|team3_new| 3000| | 2|team1_new| 50| | 2|team2_new| 500| | 2|team3_new| 5000| | 3|team1_new| 100| | 3|team2_new| 1000| | 3|team3_new| 10000| | 4|team1_new| 200| | 4|team2_new| 2000| | 4|team3_new| 20000| +---+---------+------+
The code that can transform the source dataset into the target one quite seamlessly is called STACK and it takes a parameters: the number of rows to create from the "list" of (column, alias) tuples passed as a second parameter:
SELECT id, STACK(3, 'team1_new', team1, 'team2_new', team2, 'team3_new', team3) AS (team, points) FROM pivoted_table
Analysis stage
If you're looking for the STACK operation in the source code, you should find it pretty quickly because it's represented by the ... Stack class. It extends the Generator interface and thanks to that fact the categorization of the stack should be easy. In the Scaladoc of the Generator you can read that:
An expression that produces zero or more rows given a single input row.
Generators produce multiple output rows instead of a single value like other expressions, and thus they must have a schema to associate with the rows that are output.
Pretty clear, right? And it makes sense if we apply this description to the Stack node. If you look at our example, you will see that one row, initially composed of (id, team1, team2, team3) columns, was split into 3 rows. As stated in the quoted comment, an important element of this operation is the output schema, exposed from there:
case class Stack(children: Seq[Expression]) extends Generator { // ... override def elementSchema: StructType = StructType(children.tail.take(numFields).zipWithIndex.map { case (e, index) => StructField(s"col$index", e.dataType) })
Stack generator uses this schema to verify, at the analysis stage, whether all the "stacked" columns do really exist in the data source. This verification logic is implemented in checkInputDataTypes() method. In addition to the basic checks like the number of parameters, the verification checks whether the data types from the output schema ((team, points) in our query) are the same as the data types from the "stacked" columns (eg. 'team1_new', team1 in our query). If any of them is incompatible, an AnalysisException is thrown:
for (i <- 1 until children.length) { val j = (i - 1) % numFields if (children(i).dataType != elementSchema.fields(j).dataType) { return TypeCheckResult.TypeCheckFailure( s"Argument ${j + 1} (${elementSchema.fields(j).dataType.catalogString}) != " + s"Argument $i (${children(i).dataType.catalogString})") } } TypeCheckResult.TypeCheckSuccess
Physical execution
The analyzed and optimized plans for our code look like that:
== Analyzed Logical Plan == id: int, team: string, points: int Project [id#13, team#38, points#39] +- Generate stack(3, team1_new, team1#14, team2_new, team2#15, team3_new, team3#16), false, [team#38, points#39] +- SubqueryAlias pivoted_table +- Project [_1#4 AS id#13, _2#5 AS team1#14, _3#6 AS team2#15, _4#7 AS team3#16] +- LocalRelation [_1#4, _2#5, _3#6, _4#7] == Optimized Logical Plan == Generate stack(3, team1_new, team1#14, team2_new, team2#15, team3_new, team3#16), [1, 2, 3], false, [team#38, points#39] +- LocalRelation [id#13, team1#14, team2#15, team3#16]
As you can observe, the stack function is called directly, without any prior aggregation and shuffle like it was the case for pivot operation. The generation of multiple rows from the "stacked" row is made in eval(input: InternalRow) of the Stack class that, for given input ("stacked") row, will iterate numRows (3 in our query) times. For every iteration, the eval function will set the values for the columns specified in the alias after the stack operation ((team, points) in our query). At the end, it will use these values to generate a new row that will be added to the output dataset:
val values = children.tail.map(_.eval(input)).toArray for (row <- 0 until numRows) yield { val fields = new Array[Any](numFields) for (col <- 0 until numFields) { val index = row * numFields + col fields.update(col, if (index < values.length) values(index) else null) } InternalRow(fields: _*) }
I mentioned the shuffle in the previous paragraph on purpose. If you remember, the pivot operation works on top of RelationalGroupedDataset and, naively, we could think that the reverse operation will do the opposite, so go back to the initial schema by another conversion to RelationalGroupedDataset. But as you saw in this article, it's much simpler like that since the operation can remain local to the partition.