Spark SQL statistics

Versions: Spark 2.2.0

Spark SQL has a lot of "hidden" features making it an efficient processing tool. One of them are statistics.

This post explains the Statistics class used to coordinate data processing strategies. The first part defines the role of this class in Spark SQL. It also shows different places where it's used and the computation manner. The second part contains several learning tests showing statistics computation in query execution plans.

Statistics in Spark SQL

According to the documentation org.apache.spark.sql.catalyst.plans.logical.Statistics is responsible for estimating "various statistics". Initially it has computed only the physical size of logical plan. But with the time passing, it has been enriched with new information, such as: estimated number of rows, columns statistics (min/max value, number of nulls, the number of distinct values, average length of values) and query hints (especially if the broadcast can be applied).

Statistics are used in various places and progressively they're replaced with Cost Based Estimations that will be covered in one of the next posts. Among these places we can find:

Star schema

It's a data mart schema built by 1 or more fact table and 1 or more dimension tables. The latter ones are referenced by the fact tables. The fact tables are mostly composed of simple numerical columns while dimension ones are more descriptive. An example of star schema is shown in the image below where goals is fact table and the rest are dimension tables:

Statistics computation

The statistics are computed in different manners, depending on the LogicalPlan implementation. For the basic plan node with a single child (e.g. filtering, projection), Spark uses the default implementation defined in org.apache.spark.sql.catalyst.plans.logical.UnaryNode abstract class. This implementation is quite naive. It's based on the size of children plans and output rows:

(childPhysicalSize*outputRowSize)/childRowSize

Where:

But this default formula is sometimes overriden by particular plans, for instance for:

The formulas for other classes can be found in org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala file that contains case class extending UnaryNode and sometimes overriding the default behavior.

Statistics examples

It's time to see statistics behavior through real examples:

private val sparkSession: SparkSession = SparkSession.builder().appName("Statistics test").master("local")
    .getOrCreate()

import sparkSession.implicits._
val shops = Seq(
  (1, "Shop_1"), (2, "Shop_2"), (3, "Shop_3"), (4, "Shop_4"), (5, "Shop_5")
).toDF("id", "name")
val revenues = Seq(
  (1, 1000), (2, 2000), (3, 3000), (4, 4000), (5, 5000)
).toDF("id", "revenue")

override def afterAll(): Unit = {
  sparkSession.stop()
}

"statistics" should "remain unchanged for simple filter operation" in {
  val pairIdShops = shops.filter(shops("id")%2 === 0)

  val queryExecution = pairIdShops.queryExecution.toStringWithStats

  // Expected statistics:
  // == Optimized Logical Plan ==
  // Project [_1#2 AS id#5, _2#3 AS name#6], Statistics(sizeInBytes=120.0 B, hints=none)
  // +- Filter ((_1#2 % 2) = 0), Statistics(sizeInBytes=120.0 B, hints=none)
  // +- LocalRelation [_1#2, _2#3], Statistics(sizeInBytes=120.0 B, hints=none)
  // As told in the post, the size doesn't change because the projection remains the same
  queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)")
  val queryExecutionWithoutFilter = shops.queryExecution.toStringWithStats
  queryExecutionWithoutFilter should include("Statistics(sizeInBytes=120.0 B, hints=none)")
}

"statistics" should "change after joining 2 datasets" in {
  val shopsWithRevenues = shops.join(revenues)

  val queryExecution = shopsWithRevenues.queryExecution.toStringWithStats

  // Expected plan is:
  // Join Inner, Statistics(sizeInBytes=4.7 KB, hints=none)
  // :- LocalRelation [id#25, name#26], Statistics(sizeInBytes=120.0 B, hints=none)
  // +- LocalRelation [id#35, revenue#36], Statistics(sizeInBytes=40.0 B, hints=none)
  queryExecution should include("Join Inner, Statistics(sizeInBytes=4.7 KB, hints=none)")
  queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)")
  queryExecution should include("Statistics(sizeInBytes=40.0 B, hints=none)")
}

"statistics for limit" should "decrease" in {
  val limitedShops = shops.limit(2)

  val queryExecution = limitedShops.queryExecution.toStringWithStats

  // Expected plan is:
  // == Optimized Logical Plan ==
  // GlobalLimit 2, Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none)
  // +- LocalLimit 2, Statistics(sizeInBytes=120.0 B, hints=none)
  // +- LocalRelation [id#45, name#46], Statistics(sizeInBytes=120.0 B, hints=none)
  queryExecution should include("GlobalLimit 2, Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none)")
  queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)")
}

"statistics" should "increase when using union operator" in {
  val newShops = Seq(
    (11, "Shop_11"), (12, "Shop_12"), (13, "Shop_13"), (14, "Shop_14"), (15, "Shop_15")
  ).toDF("id", "Name")
  val allShops = shops.union(newShops)

  val queryExecution = allShops.queryExecution.toStringWithStats

  // Expected plan is:
  // == Optimized Logical Plan ==
  // Union, Statistics(sizeInBytes=240.0 B, hints=none)
  // :- LocalRelation [id#65, name#66], Statistics(sizeInBytes=120.0 B, hints=none)
  // +- LocalRelation [id#91, Name#92], Statistics(sizeInBytes=120.0 B, hints=none)
  queryExecution should include("Union, Statistics(sizeInBytes=240.0 B, hints=none)")
  queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)")
}

Statistics are an useful metric used by Spark to decide about some strategies. As we could see in the first section, it uses them to chose the join strategy, detect a correct star schema or even part to limit when 2 datasets are joined. The second sections shown the basic formulas used in the computation. As we could learn, the main formula is based on the number of elements in dataset and the dependency on children physical size. But we could also see that the formula changed according to the node. The differences between these operations were partially proved in learning tests included in the last section.

If you liked it, you should read: