Spark SQL Cost-Based Optimizer

Versions: Spark 2.2.0

Prior to Spark 2.2.0 release, the data processing was based on a set of heuristic rules ignoring the typology of the data. But the most recent release brought a tool well known from the RDBMS world that is a Cost-Based Optimizer.

This post describes this major improvement in Spark library. But the first part presents a general ideas of Cost-Based Optimizer (CBO) that are implemented in RDBMS. Only the second and the third sections talk about CBO in Spark SQL context. The second part explain how it was implemented in data processing. The third section shows some examples of the use of Cost-Based Optimizer through learning tests.

Cost-Based Optimizer in RDBMS

The role of Cost-Based Optimizer (CBO) in RDBMS consists on choosing the cheaper execution plan for each query. The CBO tries to optimize the execution in terms of resources (I/O, CPU), thus to return the results as quick as possible.

How does it proceed ? The CBO analyzes all of available execution scenarios and assign a cost for each of them. At the end the scenario with the lowest cost is chosen for the physical execution. The costs are computed thanks to collected statistics about data located in the tables involved in given query. For instance, a query using primary key in WHERE will probably have lower cost than the same query filtering on not indexed column.

It's quite easy to see how CBO behaves. In most of cases (MySQL, PostgreSQL), the command EXPLAIN can be used to show the execution plan. In other databases (e.g. Oracle), the command is pretty similar: EXPLAIN PLAN. For instance, the MySQL EXPLAIN applied on a table with 3 columns (id, first name, last name) could look like:

mysql> explain format=json  SELECT * FROM user WHERE first_name != "a";
{
  "query_block": {
    "select_id": 1,
    "cost_info": {
      "query_cost": "1.80"
    },
    "table": {
      "table_name": "user",
      "access_type": "ALL",
      "rows_examined_per_scan": 4,
      "rows_produced_per_join": 3,
      "filtered": "75.00",
      "cost_info": {
        "read_cost": "1.20",
        "eval_cost": "0.60",
        "prefix_cost": "1.80",
        "data_read_per_join": "96"
      },
      "used_columns": [
        "id",
        "first_name",
        "last_name"
      ],
      "attached_condition": "(`wfc_tests`.`user`.`first_name` <> 'a')"
    }
  }
} 

Spark SQL implementation

At the time of writing (2.2.0 released) Spark SQL Cost Based Optimization is disabled by default and can be activated through spark.sql.cbo.enabled property. When enabled, it applies in: filtering, projection, joins and aggregations, as we can see in corresponding estimation objects from org.apache.spark.sql.catalyst.plans.logical.statsEstimation package: FilterEstimation, ProjectEstimation, JoinEstimation and AggregateEstimation.

Even if at first glance the use of estimation objects seems to be conditioned only by the configuration property, it's not always the case. The Spark's CBO is applied only when the statistics about manipulated data are known (read more about them in the post devoted to Statistics in Spark SQL). This condition is expressed by EstimationUtils method:

def rowCountsExist(conf: SQLConf, plans: LogicalPlan*): Boolean =
  plans.forall(_.stats(conf).rowCount.isDefined)

The filtering is an exception because it's checked against the number of rows existence:

if (childStats.rowCount.isEmpty) return None

The statistics can be gathered by the execution of ANALYZE TABLE $TABLE_NAME COMPUTE STATISTICS command before the processing execution. When ANALYZE command is called, it's executed by org.apache.spark.sql.execution.command.AnalyzeTableCommand#run(SparkSession) that updates org.apache.spark.sql.catalyst.catalog.SessionCatalog statistics of processed data.

The only problem with ANALYZE command is that it can be called only for Hive and in-memory data stores. Initially the ANALYZE command was supporting only Hive. And the code was quite self explanatory thanks to the comment: "Right now, it only supports Hive tables and it only updates the size of a Hive table in the Hive metastore.". Unfortunately the comment was removed in [SPARK-17072][SQL] support table-level statistics generation and storing into/loading from metastore Pull Request. It could let us think that the ANALYZE command can be interpreted by other storages as well. But as we'll see it through the demo video in the last section, it's not true.

Spark SQL CBO in action

The videos below show how Spark SQL executed queries. Four cases were analyzed: Hive querying with and without ANALYZE command, MySQL querying with and without ANALYZE command executed (obviously CBO is natively implemented in RDBMS but in order to show the failure of ANALYZE command and Spark's CBO non-execution, it was chosen as a counter-example). Normally only the first case used CBO and statistics feature:

Cost-Based Optimization is a next step in Spark SQL to be fully considered as distributed database. The first section shown that the concept of CBO is widely implemented in RDBMS as MySQL and that it helps to reduce the execution time of queries. The second part introduced the CBO in Spark SQL and especially the constraints - remember, it works only for data sources supporting ANALYZE TABLE command as Hive. As proven in the last section through MySQL tets, the concept of CBO is neither used in RDBMS nor in Hive without ANALYZE TABLE command.