After date time management, it's time to see another important feature of Apache Spark 3.0, he new SQL functions.
What would it take for you to trust your Databricks pipelines in production?
A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.
Konieczny
ALTER DATABASE ... SET LOCATION
That's not really a SQL function but a Hive's one. It's a metadata operation that sets a new location for a given database. If it's not a SQL concept, why am I mentioning it here? When I first saw this function I thought that it can be a good way to implement aliasing strategy for Big Data immutability (learn more in Big Data immutability approaches - aliasing) but I was wrong.
This command only changes the location for the new tables and partitions, the old ones aren't modified at all. If you thought like me, I hope that this explanation will help to keep this in mind.
bit_and, bit_or, bit_xor
BIT_AND, BIT_OR and BIT_XOR are 2 new functions to deal with bitwise integer aggregates. You can see them in action in the following snippet:
"bitwise functions" should "apply to the column values" in {
Seq(
(1), (2), (3)
).toDF("number").createTempView("bitwise_test")
val functionsString = sparkSession
.sql("""
|SELECT
|CONCAT(
| BIT_AND(number), ",", BIT_OR(number), ",", BIT_XOR(number)
|) AS bitwise_concat
|FROM bitwise_test""".stripMargin)
.map(row => row.getAs[String]("bitwise_concat"))
.collect()
functionsString should have size 1
functionsString(0) shouldEqual ("0,3,0")
} "bitwise functions" should "apply to the column values" in {
Seq(
(1), (2), (3)
).toDF("number").createTempView("bitwise_test")
val functionsString = sparkSession
.sql("""
|SELECT
|CONCAT(
| BIT_AND(number), ",", BIT_OR(number), ",", BIT_XOR(number)
|) AS bitwise_concat
|FROM bitwise_test""".stripMargin)
.map(row => row.getAs[String]("bitwise_concat"))
.collect()
functionsString should have size 1
functionsString(0) shouldEqual ("0,3,0")
}
date_part
From my previous article you learned that Apache Spark 3.0 brings a lot of changes in datetime management. One of their parts are new temporal functions and DATE_PART is one of them. As its name indicates, you can use it to retrieve a field from the timestamp or interval field. In the following snippet I'm using it to extract day and month:
"date_part" should "extract day from a timestamp column" in {
Seq(
(new Timestamp(0L)), (new Timestamp(TimeUnit.DAYS.toMillis(2L)))
).toDF("dt").createTempView("date_part_test")
val extractedDays = sparkSession
.sql("SELECT DATE_PART('day', dt) AS extracted_day FROM date_part_test")
.map(row => row.getAs[Integer]("extracted_day"))
.collect()
extractedDays should have size 2
extractedDays should contain allOf(1, 3)
}
Hyperbolic functions
To be honest, I didn't know this name when I saw the functions in the list. After a quick research I learned that the hyperbolic functions are like trigonometric functions but defined for a hyperbola instead of a circle. They can be used then for instance to describe the shape of the curve formed by a high-voltage line suspended between two towers. The following snippet compares the circle and hyperbole versions:
"hyperbolic functions" should "return different results than not hyperbolic ones" in {
Seq(
(1.3)
).toDF("number").createTempView("hyperbolic_funcs_test")
val functionsString = sparkSession
.sql("""
|SELECT
|CONCAT(
| acosh(number), ",", acos(number), ",",
| asinh(number), ",", asin(number), ",",
| atanh(number), ",", atan(number)
|) AS funcs_concat
|FROM hyperbolic_funcs_test""".stripMargin)
.map(row => row.getAs[String]("funcs_concat"))
.collect()
functionsString should have size 1
functionsString(0) shouldEqual "0.7564329108569596,NaN,1.078451058954897,NaN,NaN,0.9151007005533605"
}
make_date
As the name indicates, you can create a date-typed field from a year, month and day:
"make_date" should "create a date from 3 separated fields" in {
Seq(
(2020, 4, 1),
(2019, 1, 20),
(2020, 2, 31) // error on purpose, let's see what happens
).toDF("y", "m", "d").createTempView("make_date_test")
val mappedDates = sparkSession
.sql("SELECT CAST(make_date(y, m, d) AS STRING) AS date_from_ymd FROM make_date_test")
.map(row => row.getAs[String]("date_from_ymd"))
.collect()
mappedDates should have size 3
mappedDates should contain allOf(null, "2020-04-01", "2019-01-20")
}
make_timestamp
Apart from the date, you can also create a timestamp-typed field:
"make_timestamp" should "create a timestamp from 4 separated fields" in {
Seq(
(2020, 4, 1, 10, 20),
(2019, 1, 20, 15, 59),
(2020, 2, 2, 25, 10) // error on purpose, let's see what happens
).toDF("y", "m", "d", "h", "min").createTempView("make_timestamp_test")
val mappedTs = sparkSession
.sql("SELECT CAST(make_timestamp(y, m, d, h, min, 0, 'UTC') AS STRING) " +
"AS ts_from_ymd FROM make_timestamp_test")
.map(row => row.getAs[String]("ts_from_ymd"))
.collect()
mappedTs should have size 3
mappedTs should contain allOf(null, "2020-04-01 12:20:00", "2019-01-20 16:59:00")
}
max_by, min_by
These 2 functions are quite interesting since they came to the 3.0 release from Presto. The MAX_BY(field1, field2) returns the field1 associated with the maximum value of field2 over all input values. The MIN_BY does the opposite. You can see that in the following test cases:
"min_by and max_by" should "retrieve a column for min and max values" in {
Seq(
(2020, 4, 1, "year 2020"),
(2019, 1, 20, "year 2019"),
(2018, 2, 2, "year 2018")
).toDF("y", "m", "d", "label").createTempView("min_by_max_by_test")
val minAndMaxLabels = sparkSession
.sql("SELECT min_by(label, y) AS minLabel, max_by(label, y) AS maxLabel FROM min_by_max_by_test")
.map(row => (row.getAs[String]("minLabel"), row.getAs[String]("maxLabel")))
.collect()
minAndMaxLabels should have size 1
minAndMaxLabels(0) shouldEqual ("year 2018", "year 2020")
}
overlay
The next added function, OVERLAY, replaces a substring by another one:
"overlay" should "replace the first 3 letters by xxx" in {
Seq(
("abcdef"),
("gh"),
("ijk"),
("lmn")
).toDF("label").createTempView("overlay_test")
val minAndMaxLabels = sparkSession
.sql("SELECT OVERLAY(label PLACING 'xxx' FROM 1) AS formattedLabel FROM overlay_test")
.map(row => row.getAs[String]("formattedLabel"))
.collect()
minAndMaxLabels should have size 4
// As you can see, OVERLAY adds new characters for the columns shorter than 3 characters
minAndMaxLabels should contain allElementsOf (Seq("xxxdef", "xxx", "xxx", "xxx"))
}
New aliases
Apart from really new functions, Apache Spark 3.0 also brings few new aliases for already existing functions. Thanks to them you can use BOOL_AND instead of EVERY, BOOL_OR, SOME instead of ANY and RANDOM instead of RAND.:
"new aliases" should "do the same thing like existing methods" in {
Seq(
(true, false),
(true, true),
(true, false)
).toDF("bool1", "bool2").createTempView("aliases_test")
val boolAliases = sparkSession
.sql(
"""
|SELECT
|BOOL_AND(bool1) AS bool_and_version, EVERY(bool1) AS every_version,
|BOOL_OR(bool2) AS bool_or_version, ANY(bool2) AS any_version
|FROM aliases_test""".stripMargin)
.map(row =>
(row.getAs[Boolean]("bool_and_version"), row.getAs[Boolean]("every_version"),
row.getAs[Boolean]("bool_or_version"), row.getAs[Boolean]("any_version")))
.collect()
boolAliases should have size 1
boolAliases(0) shouldEqual (true, true, true, true)
}
"new aliases for random" should "generate different results every time" in {
Seq(
(true, false),
(true, true),
(true, false)
).toDF("bool1", "bool2").createTempView("random_aliases_test")
val randomAliases = sparkSession
.sql(
"""
|SELECT RANDOM() AS random_v, RAND() AS rand_v
|FROM random_aliases_test""".stripMargin)
.map(row =>
(row.getAs[Double]("random_v"), row.getAs[Double]("rand_v")))
.collect()
randomAliases should have size 3
}
Update February 5, 2021: I missed this change and thank you, German, very much for sharing it with me! Starting from Spark 3 you can also set a custom comparator function to array_sort.
Reverted changes
When I was writing this blog post, I was so happy to discover new functions that I almost forgot that some of them were reverted. The first of them, MAKE_INTERVAL was initially added but finally it was removed because it's too PostgreSQL-specific. Another example, JUSTIFY_DAYS, JUSTIFY_HOURS and JUSTIFY_INTERVAL, were removed after merging them to master for the same reason.
Maybe the functions added in this release are less revolutionary than higher-order functions introduced in 2.4, but this appearance can be misleading. After all, they bring a possibility to cover new use cases, especially if you need to migrate your processing code from a standard SQL-based solution to the more distributed environment on top of Apache Spark SQL.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems contact@waitingforcode.com đź“©
Read also about What's new in Apache Spark 3.0 - new SQL functions here:
- ALTER DATABASE Support ALTER DATABASE SET LOCATION Add support for bit_and and bit_or aggregates Add support for bit_xor aggregate function Date/Time Functions: date_part for timestamps Date/Time Functions: date_part for intervals Hyperbolic functions Hyperbolic Functions Add `make_date` function Date/Time Functions: make_timestamp Add max_by() / min_by() SQL aggregate functions ANSI SQL: OVERLAY function(T312) Implement boolean aggregates(BOOL_AND, BOOL_OR and EVERY) Adds `random()` sql function Add the justify_days(), justify_hours() and justify_interval() functions Add make_interval() function
Related blog posts:
- What's new in Apache Spark 3.0 - Kubernetes
- What's new in Apache Spark 3.0 - GPU-aware scheduling
- What's new in Apache Spark 3 - Structured Streaming
- What's new in Apache Spark 3.0 - UI changes
- What's new in Apache Spark 3.0 - dynamic partition pruning
#ApacheSpark 3 discovery in progress ? Today I presented some new functions added to #SparkSQL https://t.co/sghB7sYVig
— Bartosz Konieczny (@waitingforcode) June 27, 2020
