Catalyst Optimizer in Spark SQL

on waitingforcode.com

Catalyst Optimizer in Spark SQL

The use of Dataset abstraction is not a single difference between structured and unstructured data processing in Spark. Apart of that, Spark SQL uses a technique helping to get results faster.

This technique is similar to relational database query planning and is executed by Catalyst Optimizer. The first short part defines it. The second part explains the workflow related to query plan. The last part shows 2 queries and the steps of their optimization.

Catalyst Optimizer definition

An important element helping Dataset to perform better is Catalyst Optimizer (CO), an internal query optimizer. It "translates" transformations used to build the Dataset to physical plan of execution. Thus, it's similar to DAG scheduler used to create physical plan of execution of RDD.

CO is precious to Dataset in terms of performance. Since it understands the structure of used data and operations made on it, the optimizer can make some decisions helping to reduce time execution. Thanks to functional programming constructions (pattern matching and quasiquotes), is open to custom optimizations (feature still experimental at the time of writing).

Catalyst Optimizer workflow

From the big picture perspective, CO translates query to abstract syntax tree (AST) that nodes are represented by operations made to executed a query. CO will try to optimize them by applying a set of predefined rules, as for example combining 3 filters into a single one. Because tree nodes are immutable, rules application creates a new tree, being always more optimized than the previous one.

Before explaining what CO exactly does, some of concepts it uses must be detailed before:

  • logical plan - series of algebraic or language constructs, as for example: SELECT, GROUP BY or UNION keywords in SQL. It's usually represented as a tree where nodes are the constructs.
  • physical plan - similar to logical because also represented as a tree. But the difference is that physical plan concerns low level operations.
  • unoptimized/optimized plans - a plan is considered as unoptimized when CO hasn't worked on it yet. The plan becomes optimized when CO passed on it and made some optimizations (e.g.: merging filter() methods, replacing some instructions by another ones, most performant).

More exactly, CO works on 3 items listed before. It helps to move from unoptimized logical query plan to optimized physical plan, achieving that in below steps:

  1. CO tries to optimize logical query plan through predefined rule-based optimizations. The optimization can consists on:
    • predicate or projection pushdown - helps to eliminate data not respecting preconditions earlier in the computation.
    • rearrange filter
    • conversion of decimals operations to long integer operations
    • replacement of some RegEx expressions by Java's methods startsWith(String) or contains(String)
    • if-else clauses simplification
  2. Optimized logical plan is created.
  3. CO constructs multiple physical plans from optimized logical plan. A physical plan defines how data will be computed. The plans are also optimized. The optimization can concern: merging different filter(), sending predicate/projection pushdown directly to datasource to eliminate some data at data source level.
  4. CO determines which physical plan has the lowest cost of execution and choses it as the physical plan used for the computation. CO also has a concept of metrics used to estimate the cost of plans.
  5. CO generates bytecode for the best physical plan. The generation is made thanks to Scala's feature called quasiquotes. This step is optimized by cost-based optimization
  6. Once a physical plan is defined, it's executed and retrieved data is put to Dataset.

Cost-based optimization

Cost-based optimization - the optimizer looks at all possible scenarios to execute given query. It assigns a cost for each of these scenarios. This parameter indicates the efficiency of given scenario. The scenario having the lowest cost is further chosen and executed.

In RDBMS it often uses metrics which can represent for example: the number of indexed elements under given key or simply the number of rows in a table.

It's different from rule-based optimization which simply applies a set of rules to statement. In Dataset generation, only the 4th step is cost-based optimization. The others optimized steps are rule-based.

Catalyst Optimizer example

To see what happens when operations on structured data is made in Spark, below example is used:

private static final SparkSession SESSION = SparkSession.builder() 
  .master("local[1]")
  .config("spark.ui.enabled", "false")
  .config("spark.eventLog.enabled", "true")
  .config("spark.eventLog.dir", "/tmp/spark")
  .appName("CatalystOptimizer Test").getOrCreate();

@Test
public void should_get_dataframe_from_database() {
  // categories as 18 entries
  Dataset<Row> dataset = getBaseDataset("categories");

  Dataset<Row> filteredDataset = dataset.where("LENGTH(name) > 5")
    .where("name != 'mushrooms'")
    .limit(3);

  // To see logical plan, filteredDataset.logicalPlan() methods can be used,
  // such as: treeString(true), asCode()
  // To see full execution details, fileteredDataset.explain(true)
  // should be called
  assertThat(filteredDataset.count()).isEqualTo(3);
}

private Dataset<Row> getBaseDataset(String dbTable) {
  // Please note that previous query won't generate real SQL query. It will only
  // check if specified column exists. It can be observed with RDBMS query logs.
  // For the case of MySQL, below query is generated:
  // SELECT * FROM meals WHERE 1=0
  // Only the action (as filteredDataset.show()) will execute the query on database.
  // It also can be checked with query logs.
  return SESSION.read()
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/fooder")
    .option("driver", "com.mysql.cj.jdbc.Driver")
    .option("dbtable", dbTable)
    .option("user", "root")
    .option("password", "")
    .load();
}

As you can see, it gets a list of maximally 3 categories corresponding to defined criteria. The tree representing such defined expression looks like in below picture:

To see what leaves compose the tree, Dataset's logicalPlan() proves to be useful.

Another useful command helping to understand Catalyst's optimisations also comes from Dataset class and is called explain(boolean). Its boolean parameter determines the verbosity of the output. If it's set to true, the output will contain not only physical plan, but also all phases of logical plans (parsed, analyzed and optimized). The output for our query looks like:

== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Filter NOT (name#1 = mushrooms)
      +- Filter (length(name#1) > 5)
         +- Relation[id#0,name#1] JDBCRelation(categories) [numPartitions=1]

== Analyzed Logical Plan ==
id: int, name: string
GlobalLimit 3
+- LocalLimit 3
   +- Filter NOT (name#1 = mushrooms)
      +- Filter (length(name#1) > 5)
         +- Relation[id#0,name#1] JDBCRelation(categories) [numPartitions=1]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Filter ((length(name#1) > 5) && NOT (name#1 = mushrooms))
      +- Relation[id#0,name#1] JDBCRelation(categories) [numPartitions=1]

== Physical Plan ==
CollectLimit 3
+- *Filter (length(name#1) > 5)
   +- *Scan JDBCRelation(categories) [numPartitions=1] [id#0,name#1] PushedFilters: [*Not(EqualTo(name,mushrooms))], ReadSchema: struct

As you can see in "Physical Plan" part, execution workflow is composed of database query eliminating categories those names is equal to "mushrooms". The rest of work is done on Spark's level. To see that it's true, we can enable log queries. For the case of analyzed MySQL database, below query should be executed at database's level:

2017-02-04T08:53:44.152193Z         5 Query     SET character_set_results = NULL
2017-02-04T08:53:44.152380Z         5 Query     SET autocommit=1
2017-02-04T08:53:44.152835Z         5 Query     SELECT `id`,`name` FROM categories WHERE ((NOT (`name` = 'mushrooms')))

At analyze of plans explanation, we can see that optimized logical plan combined two filters defined separetly in 2 where(String) methods. To detect which rule was applied to merge them we can take a look at Spark's logs, and more specifically, at entries containing "=== Applying Rule" text. For described example we can find below entry:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
 GlobalLimit 3                                                                  GlobalLimit 3
 +- LocalLimit 3                                                                +- LocalLimit 3
!   +- Filter NOT (name#1 = mushrooms)                                                +- Filter ((length(name#1) > 5) && NOT (name#1 = mushrooms))
!      +- Filter (length(name#1) > 5)                                                 +- Relation[id#0,name#1] JDBCRelation(categories) [numPartitions=1]
!         +- Relation[id#0,name#1] JDBCRelation(categories) [numPartitions=1]   
                 (org.apache.spark.sql.execution.SparkOptimizer:62)

Let's how analyze the second query, but only from the point of physical plan:

@Test
public void should_get_data_from_bigger_table() {
  Dataset<Row> dataset = getBaseDataset("meals");

  Dataset<Row> filteredDataset = dataset.where("LENGTH(name) > 5")
    .where("name != 'mushrooms'")
    .where("name NOT LIKE 'Ser%'")
    .orderBy(new Column("name").desc())
    .limit(3);

  filteredDataset.show();
  assertThat(filteredDataset.count()).isEqualTo(3);
}

The output generated by explain() is:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[name#1 DESC NULLS LAST], output=[id#0,name#1])
+- *Filter (length(name#1) > 5)
   +- *Scan JDBCRelation(meals) [numPartitions=1] [id#0,name#1] PushedFilters: [*Not(EqualTo(name,mushrooms)), *Not(StringStartsWith(name,Ser))], ReadSchema: struct

Thus, corresponding query looks like:

SELECT `id`,`name` FROM meals WHERE ((NOT (`name` = 'mushrooms'))) AND ((NOT (`name` LIKE 'Ser%')))

It seems a little bit strange that LIMIT and SORT weren't pushed down to database. Especialy when the query operates on a table with 14400 rows. Instead, ordering and results limit are done through generated code applied on retrieved RDDs. It's also visible in logs:

show generated ordering

To get data ordered and limited, Spark uses RDD takeOrdered(Integer) method. It can be seen by adding debugging breakpoints or by analyzing JSON event log, activated by spark.eventLog.enabled and spark.eventLog.dir properties. The part about generated RDDs is placed inside SparkListenerStageCompleted event:

show SparkListenerStageCompleted event data

This post provides a little insight on Spark Catalyst Optimizer used in Spark SQL module. The first part defines shortly the main information about it. The second part goes more into details and shows the workflow of transforming parsed query to optimized physical plan. The last part, through 2 examples of different query complexity, shows how CO really behaves. It proves all that was described previously - pushdown features as well as dynamic generation of code.

Share on: