Apache Spark 2.4.0 features - array and higher-order functions

Versions: Apache Spark 2.4.0

The series about the features introduced in Apache Spark 2.4.0 continues. Today's post will cover higher-order functions that you may know from elsewhere.

The post is divided into 3 parts. The former one explains the purpose of that new feature. The second and the third ones show the array functions and the higher-order functions added in 2.4.0 release. The last part focuses on the execution plan of some of them.

Higher-order functions definition

If you've already heard about higher-order functions in a different context, it was probably when you have been learning about functional programming. Simply speaking, higher-order function is a function that takes another function as parameters or returns a new function as a result. In Scala it can be explained as:

  def sumWithMagicNumber(nr1: Int, nr2: Int) = nr1 + nr2 + 3

  def sumWithANumber(sumFunction: (Int, Int) => Int, nr1: Int, nr2: Int) = sumFunction(nr1, nr2)

  sumWithANumber(sumWithMagicNumber, 1, 2)

If it's used in functional paradigm, why would we need it in distributed data processing ? In fact, higher-order functions are a great way to solve the problem of manipulating nested data structured as arrays. Thanks to them we can define value-level function and pass them to a data type-function. Now it's up to the latter one to apply the value-level function on all its contained values.

Using higher-order functions is much easier way than the solutions we could use prior to 2.4.0 release. Before we had 2 choices: explode the nested structure and collect it back after applying the transformation logic on it, or write an User Defined Function. However, both have important drawbacks. They are slow because they involve shuffling and serialization. Moreover, they are not standard methods and they may vary depending on a team, which can add a little bit of mess in the code.

To mitigate all these issues, the 2.4.0 release proposed higher-order function, i.e. functions that takes another functions and apply them at the level of values defined in the nested data structure. An example of it is a function able to apply a map logic on the values contained in an array. It's only one case among many others that I'll present together with examples in the next section.

Aside from higher-order functions, Apache Spark comes with a wide range of new functions to manipulate nested data, particularly arrays.

Array functions list

Let's see precisely what new we can do thanks to these functions:

Higher-order functions

After discovering new functions in 2.4.0 release, let's see some of higher-order functions:

The umbrella ticket for all higher-order functions added in 2.4.0 release contained also some functions manipulating maps. However, they made the weird behavior of Apache Spark's map with duplicate keys even worse and were removed just before releasing the new framework version.

Execution plan

Unlike the previous approaches in order to solve the problem of nested data manipulations, higher-order functions come without an extra serialization or shuffle overhead. The execution plans for above tests look very often like these ones:

one_common_entry_flag: boolean
Project [arrays_overlap(letters1#518, letters2#519) AS one_common_entry_flag#522]
+- Project [_1#515 AS letters1#518, _2#516 AS letters2#519]
   +- LocalRelation [_1#515, _2#516]

concatenated_letters: string
Project [array_join(letters#507, ,, None) AS concatenated_letters#509]
+- Project [value#505 AS letters#507]
   +- LocalRelation [value#505]

different_letters: array
Project [array_except(letters1#496, letters2#497) AS different_letters#500]
+- Project [_1#493 AS letters1#496, _2#494 AS letters2#497]
   +- LocalRelation [_1#493, _2#494]

The execution plan is nothing more than a new projection with a renamed field in return. This new field is created with one of the expressions added in the release 2.4.0. The operations are working on 2 implement elements BinaryArrayExpressionWithImplicitCast trait, while the rest use UnaryExpression one. The generated execution code varies from one function to another but globally they look very similar to the code we would write with the help of UDFs:

// reverse
    s"""
       |final int $numElements = $childName.numElements();
       |$initialization
       |for (int $i = 0; $i < $numElements; $i++) {
       |  int $j = $numElements - $i - 1;
       |  $assignment
       |}
       |${ev.value} = $arrayData;
     """.stripMargin

// array_position
      s"""
         |int $pos = 0;
         |for (int $i = 0; $i < $arr.numElements(); $i ++) {
         |  if (!$arr.isNullAt($i) && ${ctx.genEqual(right.dataType, value, getValue)}) {
         |    $pos = $i + 1;
         |    break;
         |  }
         |}
         |${ev.value} = (long) $pos;
       """.stripMargin

The above applies to most new functions but not to the higher-order functions which use another new concept of Apache Spark 2.4.0 - org.apache.spark.sql.catalyst.expressions.LambdaFunction expression. For instance it's the case of transform:

transformed_letters: array
Project [transform(letters#6, lambdafunction(concat(lambda letter1#15,  , cast(lambda letter2#16 as string)), lambda letter1#15, lambda letter2#16, false)) AS transformed_letters#9]
+- Project [_1#2L AS id#5L, _2#3 AS letters#6]
   +- LocalRelation [_1#2L, _2#3]

The execution depends on the operation. The execution depends on the operation. For instance, the transform function , which is the synonymous of map operation in functional programming, does the following:

    val f = functionForEval
    val result = new GenericArrayData(new Array[Any](arr.numElements))
    var i = 0
    while (i < arr.numElements) {
      elementVar.value.set(arr.get(i, elementVar.dataType))
      if (indexVar.isDefined) {
        indexVar.get.value.set(i)
      }
      result.update(i, f.eval(inputRow))
      i += 1
    }
    result

array_fiter is responsible for:

    val f = functionForEval
    var exists = false
    var i = 0
    while (i < arr.numElements && !exists) {
      elementVar.value.set(arr.get(i, elementVar.dataType))
      if (f.eval(inputRow).asInstanceOf[Boolean]) {
        exists = true
      }
      i += 1
    }
    exists

As you can notice, both look like the UDFs we could write in previous versions of Apache Spark.

The 2.4.0 release of Apache Spark brought a lot of new function-related features. The most important of them are higher-order functions that help to work with nested data structures as arrays. As shown throughout this post, they consist of functions taking other functions as parameters. Another new feature is the enriched list of functions we can use to manipulate arrays. As illustrated in the 3rd part of the post, dozens of them were implemented in 2.4.0 version to make the arrays processing easier.