Custom projection pushdown in Apache Spark SQL for JSON columns

Versions: Apache Spark 2.3.1 https://github.com/bartosz25/spark-.../CustomProjectionPushDownTest.scala

Most of RDBMS are able to store JSON documents in columns of JSON-like type. One of them is PostgreSQL that can keep JSONs in one of 2 columns (JSON or JSONB) and that natively enables querying of JSON document attributes. As we'll see below, with a little bit of effort we can implement similar behavior in Apache Spark SQL.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in 👉 Early Release on the O'Reilly platform

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

This post talks about a small problem we can encounter with big JSON documents stored in database. The first part gives more context about when it can happen. The second one shows, through a built-in Apache Spark SQL JDBC options, how we can solve it.

Context

Apache Spark SQL is able to work with JSON data through from_json(column: Column, schema: StructType) function. But it involves a point that sometimes we don't want - the fact to move all JSON data from RDBMS to Apache Spark's compute engine and to apply the operation extracting only some of JSON fields. This situation is shown in the following code snippet:

private val CommonJdbcOptions = Map("url" -> "jdbc:postgresql://localhost:5432/spark_test",
  "dbtable" -> "friends", "user" -> "root",
  "password" -> "root", "driver" -> "org.postgresql.Driver")

private val ExpectedUsers = (1 to 40).map(nr => (s"user${nr}", s"Relation ${nr}"))

"JSON column" should "be selected in 2 steps with from_json function" in {
  val allUsers = sparkSession.read
    .format("jdbc")
    .options(CommonJdbcOptions)
    .load()

  val jsonSchema = StructType(
    Seq(
      StructField("changes", IntegerType),
      StructField("fullName", StringType),
      StructField("important", BooleanType),
      StructField("businessValue", IntegerType)
    )
  )
  import sparkSession.implicits._
  // Since "friends" JSON column is retrieved as a StringType, we need some extra work to extract
  // its content. If you're interested why, take a look at [[org.apache.spark.sql.jdbc.PostgresDialect#toCatalystType]]
  // method where both JSON and JSONB types are considered as StringType:
  // ```
  // case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
  //      Some(StringType)
  // ```
  val usersWithExtractedFriendName = allUsers.select($"user_name", $"friends_list")
    // Please note that we could try to solve it more directly with
    // `.withColumn("fullName", explode($"friends_list.fullName"))` but it won't work because of this error:
    // ```
    // Can't extract value from friends_list#1: need struct type but got string;
    // ```
    .withColumn("friends_json", from_json($"friends_list", jsonSchema))
    .select($"user_name", $"friends_json.fullName")

  val localUsersWithFriend = usersWithExtractedFriendName.collectAsList()
  localUsersWithFriend should have size 40
  localUsersWithFriend should contain allElementsOf(ExpectedUsers)
}

Often reading the whole JSON per column it's not a big deal, either because JSON documents are small or almost all their fields are used in the processing. But sometimes both use cases aren't true and only a small subset of fields from a big document are used. In such case we would like to reduce the amount of data moved across the network and make some kind of "custom projection pushdown" at the database level. We would like to push the most of operations possible to do locally, at data source level, exactly as it's made in predicate pushdown in Spark SQL. Please note that I called this pushdown "custom" because the engine already does a projection pushdown that will select only the columns needed in the processing. But unfortunately it doesn't apply on nested fields of JSON type. In the rest of the post, this term will be used interchangeably with "nested fields projection pushdown".

"Nested fields projection pushdown" in Apache Spark SQL

Another reason of calling this projection custom is that it relies on the execution plan generated by the data source. Sometimes it may appear to be inefficient so if you use the solution, you should monitor what happens at the database level. But for now let's suppose simply it works efficiently. In order to extract just the information we're interested in, we'll use PostgreSQL's -> operator and Apache Spark's table option with custom SELECT defined inside:

"custom subquery in dbtable" should "extract only one JSON attribute" in {
  val jdbcOptions = CommonJdbcOptions ++ Map(
    "dbtable" -> "(SELECT user_name, friends_list->'fullName' AS fullName, partition_nr FROM friends) AS friends_table",
    "partitionColumn" -> "partition_nr", "numPartitions" -> "2", "lowerBound" -> "1",
    "upperBound" -> "3")
  val allUsersWithFriend = sparkSession.read
    .format("jdbc")
    .options(jdbcOptions)
    .load()

  val localUsersWithFriend = allUsersWithFriend.collectAsList()
  localUsersWithFriend should have size 40
  localUsersWithFriend should contain allElementsOf(ExpectedUsers) 
}

If you analyze the queries executed by PostgreSQL you'll find that it returns only the rows defined in its projection to Apache Spark engine. Thus, only one JSON's attribute is returned for fullname column. Thanks to that we achieved the initial goal to reduce the amount of transferred data and moreover we execute most of the operations possible at the data source level.

Fine, but doesn't it impact the performance ? At first glance we could think that yes and very often the initial supposition about the physical execution plan of the query is based on the sequential scan of the whole database and applying on it the partitioning conditions only later. But as we can see in the execution plan explanation, it's not true and PostgreSQL is smart enough to consider our nested subquery as the normal one. It's obvious when we compare the 2 queries by beginning by the plan for the flatten one (note: the index is not used because of the volume of data, read the next paragraph to see what happens with much more rows):

spark_test=# EXPLAIN ANALYZE SELECT user_name, friends_list->'fullName'
 AS friend_name, partition_nr FROM friends WHERE partition_nr < 2 or partition_nr is null;
                                             QUERY PLAN
----------------------------------------------------------------------------------------------------
 Seq Scan on friends  (cost=0.00..1.53 rows=13 width=74) (actual time=0.013..0.029 rows=15 loops=1)
   Filter: ((partition_nr < 2) OR (partition_nr IS NULL))
   Rows Removed by Filter: 25
 Planning time: 0.067 ms
 Execution time: 0.050 ms
(5 rows)

And now for the query with nested subquery:

spark_test=# EXPLAIN ANALYZE (SELECT "user_name","friend_name","partition_nr" FROM (SELECT user_name, friends_list->'fullName' AS friend_name, partition_nr FROM friends) AS friends_table WHERE partition_nr < 2 or partition_nr is null);
                                             QUERY PLAN
----------------------------------------------------------------------------------------------------
 Seq Scan on friends  (cost=0.00..1.53 rows=13 width=74) (actual time=0.009..0.022 rows=15 loops=1)
   Filter: ((partition_nr < 2) OR (partition_nr IS NULL))
   Rows Removed by Filter: 25
 Planning time: 0.072 ms
 Execution time: 0.042 ms
(5 rows)

As you can clearly see, the execution plans are the same for both queries. Since the query used in Apache Spark processing doesn't involve a lot of complicated steps, it's simply considered by the database engine as a kind of global alias. As mentioned in parenthesis, it was fine for our initial case with dozens of rows but let's see if the execution plans are impacted by the amount of data: 1 000 000 rows generated with the following script:

import json
import random

logs = []
insert_logs = []
for nr in range(0, 1000000):
    user_json = {"fullName": "Relation {nr}".format(nr=nr), "changes": 1,
        "important": True, "businessValue": random.randint(1, 4000)
    }
    logs.append("('user{id}', '{json}', {partition})".format(json=json.dumps(user_json),
    partition=random.randint(1, 5), id=nr))
    if nr % 100 == 0:
        insert_values = ', '.join(logs)
        insert_logs.append("INSERT INTO friends(user_name, friends_list, partition_nr) VALUES {values};".format(values=insert_values))
        logs = []

output_file = open('./queries_1_million.sql', 'w+')
insert_values = '\n '.join(insert_logs)
output_file.write("""
    CREATE TABLE friends (
      user_name VARCHAR(10) NOT NULL,
      friends_list JSONB NOT NULL,
      partition_nr INTEGER NOT NULL,
      PRIMARY KEY (user_name)
    );
    CREATE INDEX ON friends (partition_nr);

    {values}

    """.format(values=insert_values))
output_file.close()

After analyzing both queries we get the same execution plans:

Bitmap Heap Scan on friends  (cost=4352.97..26558.68 rows=198514 width=46) (actual time=40.166..305.512 rows=200695 loops=1)
   Recheck Cond: ((partition_nr < 2) OR (partition_nr IS NULL))
   Heap Blocks: exact=19228
->  BitmapOr  (cost=4352.97..4352.97 rows=198514 width=0) (actual time=36.906..36.906 rows=0 loops=1)
        ->  Bitmap Index Scan on friends_partition_nr_idx  (cost=0.00..4249.28 rows=198514 width=0) (actual time=36.877..36.877 rows=200695 loops=1)
               Index Cond: (partition_nr < 2)
         ->  Bitmap Index Scan on friends_partition_nr_idx  (cost=0.00..4.43 rows=1 width=0) (actual time=0.025..0.025 rows=0 loops=1)
               Index Cond: (partition_nr IS NULL)

Better and clearer ways certainly exist to handle the case though. The first one could be a real projection pushdown of from_json function for the columns defined at data source. Another solution could be the extension of definition we can put into customSchema option. With that we could define our schema as user_name STRING, (friends_list->'fullName') AS fullName STRING" and let Apache Spark build the schema from such kind of expressions.

To recap, in this post we discovered how to build a custom projection pushdown. It's custom since it isn't automatically managed by Apache Spark but with some of its mechanisms it's possible to achieve. However, the provided solution should be used with caution. After all the behavior depends on the RDBMS optimization and if somehow it decides to not optimize nested subquery, the trick will do more damages than profits.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!