Apache Spark 2.4.0 features - array and higher-order functions

on waitingforcode.com

Apache Spark 2.4.0 features - array and higher-order functions

The series about the features introduced in Apache Spark 2.4.0 continues. Today's post will cover higher-order functions that you may know from elsewhere.

The post is divided into 3 parts. The former one explains the purpose of that new feature. The second and the third ones show the array functions and the higher-order functions added in 2.4.0 release. The last part focuses on the execution plan of some of them.

Higher-order functions definition

If you've already heard about higher-order functions in a different context, it was probably when you have been learning about functional programming. Simply speaking, higher-order function is a function that takes another function as parameters or returns a new function as a result. In Scala it can be explained as:

  def sumWithMagicNumber(nr1: Int, nr2: Int) = nr1 + nr2 + 3

  def sumWithANumber(sumFunction: (Int, Int) => Int, nr1: Int, nr2: Int) = sumFunction(nr1, nr2)

  sumWithANumber(sumWithMagicNumber, 1, 2)

If it's used in functional paradigm, why would we need it in distributed data processing ? In fact, higher-order functions are a great way to solve the problem of manipulating nested data structured as arrays. Thanks to them we can define value-level function and pass them to a data type-function. Now it's up to the latter one to apply the value-level function on all its contained values.

Using higher-order functions is much easier way than the solutions we could use prior to 2.4.0 release. Before we had 2 choices: explode the nested structure and collect it back after applying the transformation logic on it, or write an User Defined Function. However, both have important drawbacks. They are slow because they involve shuffling and serialization. Moreover, they are not standard methods and they may vary depending on a team, which can add a little bit of mess in the code.

To mitigate all these issues, the 2.4.0 release proposed higher-order function, i.e. functions that takes another functions and apply them at the level of values defined in the nested data structure. An example of it is a function able to apply a map logic on the values contained in an array. It's only one case among many others that I'll present together with examples in the next section.

Aside from higher-order functions, Apache Spark comes with a wide range of new functions to manipulate nested data, particularly arrays.

Array functions list

Let's see precisely what new we can do thanks to these functions:

  • sort_array - as you can easily deduce, this function sorts the entries of the array either in ascending or in descending order:
      "array_sort" should "sort the letters in ascending order" in {
        val testedDataset = Seq((Array("a", "b", "d", "e", "c"))).toDF("letters")
    
        val sortedLetters = testedDataset.select(sort_array($"letters", asc = true).as("sorted_letters"))
    
        val functionResult = stringifyResult(sortedLetters, "sorted_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b, c, d, e"
      }
    
  • array_max - takes the maximum value of the array:
      "array_max" should "get the max letter of the array" in {
        val testedDataset = Seq((Array("a", "b", "d", "e", "c"))).toDF("letters")
    
        val maxLetters = testedDataset.select(array_max($"letters").as("max_letter"))
    
        val functionResult = maxLetters.collect().map(row => row.getAs[String]("max_letter"))
        functionResult should have size 1
        functionResult(0) shouldEqual "e"  }
    
  • array_min - takes the minimum value of the underlying array:
      "array_min" should "get the min letter of the array" in {
        val testedDataset = Seq((Array("a", "b", "d", "e", "c"))).toDF("letters")
    
        val minLetters = testedDataset.select(array_min($"letters").as("min_letter"))
    
        val functionResult = minLetters.collect().map(row => row.getAs[String]("min_letter"))
        functionResult should have size 1
        functionResult(0) shouldEqual "a"
      }
    
  • array_position - returns the 1-index based position of given element in the array or 0 if the element doesn't exist:
      "array_position" should "get return the position of existent and missing letters" in {
        val testedDataset = Seq((Array("a", "b", "d", "e", "c"))).toDF("letters")
    
        val positions = testedDataset.select(array_position($"letters", "b").as("b_position"),
          array_position($"letters", "z").as("z_position"))
    
        val functionResult = positions.collect().map(row => (row.getAs[Long]("b_position"), row.getAs[Long]("z_position")))
        functionResult should have size 1
        // As you can see, the position is not 0-based, 0 is used to mark the missing index
        functionResult(0) shouldEqual (2, 0)
      }
    
  • array_remove - removes all specified elements from the array:
      "array_remove" should "remove the 'a' letter" in {
        val testedDataset = Seq((Array("a", "b", "c", "d", "a"))).toDF("letters")
    
        val lettersWithoutA = testedDataset.select(array_remove($"letters", "a").as("letters_without_a"))
    
        val functionResult = stringifyResult(lettersWithoutA, "letters_without_a")
        functionResult should have size 1
        functionResult(0) shouldEqual "b, c, d"
      }
    
  • element_at - returns the element at given position for an array or at given key for a map:
      "element_at" should "return the letters at specific positions" in {
        val testedDataset = Seq((Array("a", "b", "c", "d"))).toDF("letters")
    
        val lettersFromPositions = testedDataset.select(element_at($"letters", 1).as("letter2"),
          element_at($"letters", 5).as("letter6"))
    
        val functionResult = lettersFromPositions.collect().map(row => (row.getAs[String]("letter2"), row.getAs[String]("letter6")))
        functionResult should have size 1
        functionResult(0) shouldEqual ("a", null)
      }
    
      "element_at" should "return the value from the map" in {
        val testedDataset = Seq((Map("a" -> 3)), (Map("b" -> 4))).toDF("letters")
    
        val lettersFromPositions = testedDataset.select(element_at($"letters", "a").as("value_of_a"))
    
        val functionResult = lettersFromPositions.collect().map(row => {
          if (row.isNullAt(row.fieldIndex("value_of_a"))) {
            null
          } else {
            row.getAs[Int]("value_of_a")
          }
        })
        functionResult should have size 2
        functionResult should contain allOf(3, null)
      }
    
  • flatten - transforms an array of arrays into a single array. It flattens only the first nested level:
      "flatten" should "transform nested Arrays to one-level Array" in {
        val testedDatasetWithArrayOfArrays = Seq((1L, Array(Array("a", "b"), Array("c", "d", "e")))).toDF("id", "letters")
    
        val flattenedLetters = testedDatasetWithArrayOfArrays.select(flatten($"letters").as("flattened_letters"))
    
        val functionResult = stringifyResult(flattenedLetters, "flattened_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b, c, d, e"
      }
    
  • reverse - returns the element of the array in reverse order:
      "reverse" should "return the array from the end" in {
        val testedDataset = Seq((Array("a", "b", "c", "d"))).toDF("letters")
    
        val reversedArrayLetters = testedDataset.select(reverse($"letters").as("reversed_letters"))
    
        val functionResult = stringifyResult(reversedArrayLetters, "reversed_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "d, c, b, a"
      }
    
  • sequence - generates a sequence of integers between 2 numbers, with the possibility to define the step attribute:
      "sequence" should "create an array of numbers from a range with a step" in {
        val testedDatasetWithArrayOfArrays = Seq((2L, 10L)).toDF("nr1", "nr2")
        val sequenceNumbers = testedDatasetWithArrayOfArrays.select(sequence($"nr1", $"nr2", $"nr1").as("numbers"))
    
        val functionResult = sequenceNumbers.collect().map(row => row.getAs[List[Int]]("numbers").mkString(", "))
        functionResult should have size 1
        functionResult(0) shouldEqual "2, 4, 6, 8, 10"
      }
    
  • slice - returns a sub-array of n elements, cut at y index:
      "slice" should "return the specified part of the array" in {
        val testedDataset = Seq((Array("a", "b", "c", "d"))).toDF("letters")
    
        val slicedLetters = testedDataset.select(slice($"letters", 2, 3).as("sliced_letters"))
    
        val functionResult = stringifyResult(slicedLetters, "sliced_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "b, c, d"
      }
    
  • shuffle - generates a random, and hence not determinsitic, permutation of the array:
      "shuffle" should "return a random permutation of the array" in {
        val testedDataset = Seq((Array("a", "b", "c", "d"))).toDF("letters")
    
        val shuffledLetters1 = testedDataset.select(shuffle($"letters").as("shuffled_letters"))
        val shuffledLetters2 = testedDataset.select(shuffle($"letters").as("shuffled_letters"))
    
        val functionResult1 = stringifyResult(shuffledLetters1, "shuffled_letters")
        val functionResult2 = stringifyResult(shuffledLetters2, "shuffled_letters")
        functionResult1 should have size 1
        functionResult2 should have size 1
        functionResult1(0) shouldNot equal(functionResult2(0)) 
      }
    
  • cardinality - returns the number of elements in given array:
      "cardinality" should "return the size of the arrays" in {
        val testedDataset = Seq((Array("a", "b", "c", "d")), (Array("a", "a"))).toDF("letters")
    
        val lettersWithCardinality = testedDataset.selectExpr("cardinality(letters) AS letters_number")
    
        val functionResult = lettersWithCardinality.collect().map(row => row.getAs[Int]("letters_number"))
        functionResult should have size 2
        functionResult should contain allOf(4, 2) 
      }
    
  • array_repeat - creates an array with initial value repeated N times:
      "array_repeat" should "create arrays with letter repeated twice" in {
        val lettersDataset = Seq(("a"), ("b")).toDF("letter")
        val repeatedLetters = lettersDataset.select(array_repeat($"letter", 2).as("repeated_letters"))
    
        val functionResult = stringifyResult(repeatedLetters, "repeated_letters")
        functionResult should have size 2
        functionResult should contain allOf("a, a", "b, b") 
      }
    
  • array_distinct - returns all duplicated letters from the array:
      "array_distinct" should "create an array without duplicates" in {
        val lettersDataset = Seq((Array("a", "a", "b", "c", "b", "a"))).toDF("duplicated_letters")
        val distinctLetters = lettersDataset.select(array_distinct($"duplicated_letters").as("distinct_letters"))
    
        val functionResult = stringifyResult(distinctLetters, "distinct_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b, c" 
      }
    
  • array_intersect - returns an array containing the elements present in both intersected arrays:
      "array_intersect" should "return distinct common letters of 2 arrays" in {
        // a = 4 times, a = 3 times
        val lettersDataset = Seq((Array("a", "a", "a", "a", "b", "c", "b"), Array("a", "a", "a", "b", "d", "e")))
          .toDF("letters1", "letters2")
        val commonLetters = lettersDataset.select(array_intersect($"letters1", $"letters2").as("common_letters"))
    
        val functionResult = stringifyResult(commonLetters, "common_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b" 
      }
    
  • array_union - returns an union of 2 arrays:
      "array_union" should "concatenate 2 arrays" in {
        val lettersDataset = Seq((Array("a", "b"), Array("c", "d"))).toDF("letters1", "letters2")
        val concatenatedArrays = lettersDataset.select(array_union($"letters1", $"letters2").as("concatenated_arrays"))
    
        val functionResult = stringifyResult(concatenatedArrays, "concatenated_arrays")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b, c, d" 
      }
    
  • array_except - returns an array with the elements of the first array that are not defined in the second one:
      "array_except" should "return an array with the elements of the 1st array missing in the 2nd array" in {
        val lettersDataset = Seq((Array("a", "b", "c"), Array("c", "d"))).toDF("letters1", "letters2")
        val differentLetters = lettersDataset.select(array_except($"letters1", $"letters2").as("different_letters"))
    
        val functionResult = stringifyResult(differentLetters, "different_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b" 
      }
    
  • array_join - concatenates the elements of an array with a delimiter:
      "array_join" should "concatenate array's content" in {
        val lettersDataset = Seq((Array("a", "b", "c"))).toDF("letters")
        val concatenatedLetters = lettersDataset.select(array_join($"letters", ",").as("concatenated_letters"))
    
        val functionResult = concatenatedLetters.collect().map(row => row.getAs[String]("concatenated_letters"))
        functionResult should have size 1
        functionResult(0) shouldEqual "a,b,c" 
      }
    
  • arrays_overlap - returns a boolean flag which is true if both analyzed arrays have at least one non-null element in common:
      "arrays_overlap" should "remove the a letter" in {
        val lettersDataset = Seq((Array("a", "b", "c"), Array("c", "d")),
          (Array("a", "b"), Array("c", "d"))).toDF("letters1", "letters2")
        val oneCommonEntry = lettersDataset.select(arrays_overlap($"letters1", $"letters2").as("one_common_entry_flag"))
    
        val functionResult = oneCommonEntry.collect().map(row => row.getAs[Boolean]("one_common_entry_flag"))
        functionResult should have size 2
        functionResult should contain allOf(true, false) 
      }
    
  • concat - concatenates multiple arrays into a single one:
      "concat" should "concatenate 3 arrays into a single one" in {
        val lettersDataset = Seq((Array("a", "b"), Array("c", "d"), Array("e", "f"))).toDF("letters1", "letters2", "letters3")
        val concatenatedArrays = lettersDataset.select(concat($"letters1", $"letters2", $"letters3").as("concatenated_arrays"))
    
        val functionResult = stringifyResult(concatenatedArrays, "concatenated_arrays")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, b, c, d, e, f" 
      }
    
  • arrays_zip - creates new array by mixing the elements of the same position of input arrays:
      "arrays_zip" should "create arrays by mixing input arrays at the same position" in {
        // It won't work for (Array("e", "b"), Array(), Array("g", "h")) because of :
        // Array[_ <: java.lang.String] (of class scala.reflect.internal.Types$ExistentialType)
        val lettersDataset = Seq((Array("a", "b"), Array("c", "d"), Array("e", "f")), (Array("e", "b"), Array("X"), Array("g", "h")) )
          .toDF("letters1", "letters2", "letters3")
        val zippedLetters = lettersDataset.select(arrays_zip($"letters1", $"letters2", $"letters3").as("zipped_letters"))
    
        val functionResult = stringifyResult(zippedLetters, "zipped_letters")
        functionResult should have size 2
        functionResult should contain allOf ("[a,c,e], [b,d,f]", "[e,X,g], [b,null,h]") 
      }
    
  • map_from_entries - create a map from an array of entries (tuples):
      "map_from_entries" should "create a map from an array" in {
        val lettersDataset = Seq((Array(("a", 1), ("b", 2), ("c", 3), ("d", 4)))).toDF("letters")
        val mappedArraysFromEntries = lettersDataset.select(map_from_entries($"letters").as("mapped_arrays_from_entries"))
    
        val functionResult = stringifyResult(mappedArraysFromEntries, "mapped_arrays_from_entries")
        functionResult should have size 1
        functionResult(0) shouldEqual "a -> 1, b -> 2, c -> 3, d -> 4" 
      }
    
  • map_from_arrays - create a map from 2 arrays. The former one is used as keys, the latter as values:
      "map_from_arrays" should "create a map from 2 arrays" in {
        // It fails when 2 arrays haven't the same length with:
        // java.lang.RuntimeException: The given two arrays should have the same length
        val lettersDataset = Seq((Array("a", "b", "c"), Array("d", "e", "f"))).toDF("letters1", "letters2")
        val mappedArrays = lettersDataset.select(map_from_arrays($"letters1", $"letters2").as("mapped_arrays"))
    
        val functionResult = stringifyResult(mappedArrays, "mapped_arrays")
        functionResult should have size 1
        functionResult(0) shouldEqual "a -> d, b -> e, c -> f" 
      }
    

Higher-order functions

After discovering new functions in 2.4.0 release, let's see some of higher-order functions:

  • transform - maps the content of the array with the defined mapping function:
      "transform" should "concatenate letters with indexes" in {
        val lettersDataset = Seq((Array("a", "b", "c"))).toDF("letters")
        val transformedLetters = lettersDataset.selectExpr("transform(letters, (letter, i) -> concat(\"index \", i, \" value = \", letter))" +
          " AS transformed_letters")
    
        val functionResult = stringifyResult(transformedLetters, "transformed_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "index 0 value = a, index 1 value = b, index 2 value = c" 
      }
    
  • filter - applies predicate on all nested values and returns an array with the elements matching the predicate:
      "filter" should "remove all letters except a" in {
        val lettersDataset = Seq((Array("a", "b", "c", "a"))).toDF("letters")
        val filteredLetters = lettersDataset.selectExpr("filter(letters, letter -> letter == 'a')" +
          " AS filtered_letters")
    
        val functionResult = stringifyResult(filteredLetters, "filtered_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a, a" 
      }
    
  • aggregate - returns a single value generated from an array, an initial state and an aggregate function:
      "aggregate" should "remove the a letter" in {
        val lettersDataset = Seq((Array("a", "b", "c"))).toDF("letters")
        val aggregatedLetters = lettersDataset.selectExpr("aggregate(letters, 'letters', (lettersPrefix, letter) -> concat(lettersPrefix, ' -> ', letter))" +
          " AS aggregated_letters")
    
        val functionResult = aggregatedLetters.collect().map(row => row.getAs[String]("aggregated_letters"))
        functionResult should have size 1
        functionResult(0) shouldEqual "letters -> a -> b -> c" 
      }
    
  • exists - checks whether an array contains at least one element that can be evaluated as true after applying the predicate on it:
      "exists" should "check whether arrays contain searched letter" in {
        val testedDataset = Seq((Array("a", "b", "c", "d")), (Array("e", "f", "g"))).toDF("letters")
        val existenceFlag = testedDataset.selectExpr("exists(letters, letter -> letter = 'a') AS existence_flag")
    
        val functionResult = existenceFlag.collect().map(row => row.getAs[Boolean]("existence_flag"))
        functionResult should have size 2
        functionResult should contain allOf(true, false) 
      }
    
  • zip_with - merges 2 arrays of the same length into a new array with a merge function:
      "zip_with" should "merge 2 arrays of the same length" in {
        val lettersDataset = Seq((Array("a", "b", "c"), Array("d", "e", "f"))).toDF("letters1", "letters2")
        val zippedLetters = lettersDataset.selectExpr("zip_with(letters1, letters2,  (letter1, letter2) -> concat(letter1, ' -> ', letter2))" +
          " AS zipped_letters")
    
        println(zippedLetters.collect().map(ro => ro.mkString(",")).mkString("\n"))
        val functionResult = stringifyResult(zippedLetters, "zipped_letters")
        functionResult should have size 1
        functionResult(0) shouldEqual "a -> d, b -> e, c -> f"
        zippedLetters.explain(true)
      }
    

The umbrella ticket for all higher-order functions added in 2.4.0 release contained also some functions manipulating maps. However, they made the weird behavior of Apache Spark's map with duplicate keys even worse and were removed just before releasing the new framework version.

Execution plan

Unlike the previous approaches in order to solve the problem of nested data manipulations, higher-order functions come without an extra serialization or shuffle overhead. The execution plans for above tests look very often like these ones:

one_common_entry_flag: boolean
Project [arrays_overlap(letters1#518, letters2#519) AS one_common_entry_flag#522]
+- Project [_1#515 AS letters1#518, _2#516 AS letters2#519]
   +- LocalRelation [_1#515, _2#516]

concatenated_letters: string
Project [array_join(letters#507, ,, None) AS concatenated_letters#509]
+- Project [value#505 AS letters#507]
   +- LocalRelation [value#505]

different_letters: array
Project [array_except(letters1#496, letters2#497) AS different_letters#500]
+- Project [_1#493 AS letters1#496, _2#494 AS letters2#497]
   +- LocalRelation [_1#493, _2#494]

The execution plan is nothing more than a new projection with a renamed field in return. This new field is created with one of the expressions added in the release 2.4.0. The operations are working on 2 implement elements BinaryArrayExpressionWithImplicitCast trait, while the rest use UnaryExpression one. The generated execution code varies from one function to another but globally they look very similar to the code we would write with the help of UDFs:

// reverse
    s"""
       |final int $numElements = $childName.numElements();
       |$initialization
       |for (int $i = 0; $i < $numElements; $i++) {
       |  int $j = $numElements - $i - 1;
       |  $assignment
       |}
       |${ev.value} = $arrayData;
     """.stripMargin

// array_position
      s"""
         |int $pos = 0;
         |for (int $i = 0; $i < $arr.numElements(); $i ++) {
         |  if (!$arr.isNullAt($i) && ${ctx.genEqual(right.dataType, value, getValue)}) {
         |    $pos = $i + 1;
         |    break;
         |  }
         |}
         |${ev.value} = (long) $pos;
       """.stripMargin

The above applies to most new functions but not to the higher-order functions which use another new concept of Apache Spark 2.4.0 - org.apache.spark.sql.catalyst.expressions.LambdaFunction expression. For instance it's the case of transform:

transformed_letters: array
Project [transform(letters#6, lambdafunction(concat(lambda letter1#15,  , cast(lambda letter2#16 as string)), lambda letter1#15, lambda letter2#16, false)) AS transformed_letters#9]
+- Project [_1#2L AS id#5L, _2#3 AS letters#6]
   +- LocalRelation [_1#2L, _2#3]

The execution depends on the operation. The execution depends on the operation. For instance, the transform function , which is the synonymous of map operation in functional programming, does the following:

    val f = functionForEval
    val result = new GenericArrayData(new Array[Any](arr.numElements))
    var i = 0
    while (i < arr.numElements) {
      elementVar.value.set(arr.get(i, elementVar.dataType))
      if (indexVar.isDefined) {
        indexVar.get.value.set(i)
      }
      result.update(i, f.eval(inputRow))
      i += 1
    }
    result

array_fiter is responsible for:

    val f = functionForEval
    var exists = false
    var i = 0
    while (i < arr.numElements && !exists) {
      elementVar.value.set(arr.get(i, elementVar.dataType))
      if (f.eval(inputRow).asInstanceOf[Boolean]) {
        exists = true
      }
      i += 1
    }
    exists

As you can notice, both look like the UDFs we could write in previous versions of Apache Spark.

The 2.4.0 release of Apache Spark brought a lot of new function-related features. The most important of them are higher-order functions that help to work with nested data structures as arrays. As shown throughout this post, they consist of functions taking other functions as parameters. Another new feature is the enriched list of functions we can use to manipulate arrays. As illustrated in the 3rd part of the post, dozens of them were implemented in 2.4.0 version to make the arrays processing easier.

Share, like or comment this post on Twitter:

Share on: