Spark SQL has a lot of "hidden" features making it an efficient processing tool. One of them are statistics.

### New ebook ðŸ”¥

Learn 84 ways to solve common data engineering problems with cloud services.

ðŸ‘‰ I want my copy

This post explains the Statistics class used to coordinate data processing strategies. The first part defines the role of this class in Spark SQL. It also shows different places where it's used and the computation manner. The second part contains several learning tests showing statistics computation in query execution plans.

## Statistics in Spark SQL

According to the documentation *org.apache.spark.sql.catalyst.plans.logical.Statistics* is responsible for estimating "various statistics". Initially it has computed only the physical size of logical plan. But with the time passing, it has been enriched with new information, such as: estimated number of rows, columns statistics (min/max value, number of nulls, the number of distinct values, average length of values) and query hints (especially if the broadcast can be applied).

Statistics are used in various places and progressively they're replaced with Cost Based Estimations that will be covered in one of the next posts. Among these places we can find:

- join strategy resolution - the stats are used here to determine Spark SQL joins strategy. It checks if the statistics are broadcastable (then one part of join can be broadcasted to all executors) or if the size of physical plan fits to the specified threshold in the case of other join types.
- star schema detection - in this case the Statistic instance is used to find the fact and dimension tables in
*star schema*. The detection algorithm uses 2 rules: primary key (fact tables) - foreign key (dimension tables) dependency and cardinality (fact tables are bigger than dimension tables). To detect both of these facts, the algorithm estimated number of rows and column (number of nulls and distinct values) statistics. - full outer join limits handling - if this kind of joins is applied in conjunction with LIMIT clause, Spark tries to apply the limit only on 1 side. Thanks to that the rows from the limited side can ensure that they can still match all rows from the other, non limited, side. Spark uses Statistics only when it doesn't know the maximum number of rows for each side. In this situation it compares the size of both sides and limits either left or right side of the join query.

### Star schema

It's a data mart schema built by 1 or more fact table and 1 or more dimension tables. The latter ones are referenced by the fact tables. The fact tables are mostly composed of simple numerical columns while dimension ones are more descriptive. An example of star schema is shown in the image below where goals is fact table and the rest are dimension tables:

## Statistics computation

The statistics are computed in different manners, depending on the LogicalPlan implementation. For the basic plan node with a single child (e.g. filtering, projection), Spark uses the default implementation defined in *org.apache.spark.sql.catalyst.plans.logical.UnaryNode* abstract class. This implementation is quite naive. It's based on the size of children plans and output rows:

(childPhysicalSize*outputRowSize)/childRowSize

Where:

*childPhysicalSize*- the estimated size of the data. For instance, if we use in-memory DataFrame (= built with .toDF function), the physical size will be computed as a size of each row multiplied by the number of elements. So if a row is composed of an integer column (size of 4) and we have 20 rows, the physical size will be of 80 (4*20).

In the case of other relations, as for instance JDBCRelation, the size can't be computed in the same manner (data is not loaded). Thus Spark uses the value defined in*spark.sql.defaultSizeInBytes*property. It's used directly in JDBCRelation class as a sizeInBytes method:*def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes**outputRowSize*- the size of one row returned by the given logical plan, computed as a sum of default size for each type with the overhead of 8 (used to avoid division by 0). For instance, if the result is composed of 2 columns (IntType, StringType), with respective default sizes of 4 and 20, each row will have the size of 32 (4 + 20 + 8)*childRowSize*- the same as above but applied to child plan.

But this default formula is sometimes overriden by particular plans, for instance for:

- projection and filtering - for these 2 operations Spark can use cost-based optimization (covered in other post) or the default implementation presented above.
- intersect - the physical size corresponds to the size of smaller from intersected sides.
- except - this case excludes one dataset (called 'right') from the final result. Thus the statistics correspond to the statistics of kept side (called 'left')
- union - as we could suppose, the physical size is the sum of children physical sizes, where children are combined plans (datasets)
- range - the physical size is equal to the number of elements in the range multiplied by the default size of LongType (8)
- limit - Spark computes the new physical size and row count. The result is calculated with the min value between limit and the number of rows in child plan.

The formulas for other classes can be found in org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala file that contains case class extending UnaryNode and sometimes overriding the default behavior.

## Statistics examples

It's time to see statistics behavior through real examples:

private val sparkSession: SparkSession = SparkSession.builder().appName("Statistics test").master("local") .getOrCreate() import sparkSession.implicits._ val shops = Seq( (1, "Shop_1"), (2, "Shop_2"), (3, "Shop_3"), (4, "Shop_4"), (5, "Shop_5") ).toDF("id", "name") val revenues = Seq( (1, 1000), (2, 2000), (3, 3000), (4, 4000), (5, 5000) ).toDF("id", "revenue") override def afterAll(): Unit = { sparkSession.stop() } "statistics" should "remain unchanged for simple filter operation" in { val pairIdShops = shops.filter(shops("id")%2 === 0) val queryExecution = pairIdShops.queryExecution.toStringWithStats // Expected statistics: // == Optimized Logical Plan == // Project [_1#2 AS id#5, _2#3 AS name#6], Statistics(sizeInBytes=120.0 B, hints=none) // +- Filter ((_1#2 % 2) = 0), Statistics(sizeInBytes=120.0 B, hints=none) // +- LocalRelation [_1#2, _2#3], Statistics(sizeInBytes=120.0 B, hints=none) // As told in the post, the size doesn't change because the projection remains the same queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)") val queryExecutionWithoutFilter = shops.queryExecution.toStringWithStats queryExecutionWithoutFilter should include("Statistics(sizeInBytes=120.0 B, hints=none)") } "statistics" should "change after joining 2 datasets" in { val shopsWithRevenues = shops.join(revenues) val queryExecution = shopsWithRevenues.queryExecution.toStringWithStats // Expected plan is: // Join Inner, Statistics(sizeInBytes=4.7 KB, hints=none) // :- LocalRelation [id#25, name#26], Statistics(sizeInBytes=120.0 B, hints=none) // +- LocalRelation [id#35, revenue#36], Statistics(sizeInBytes=40.0 B, hints=none) queryExecution should include("Join Inner, Statistics(sizeInBytes=4.7 KB, hints=none)") queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)") queryExecution should include("Statistics(sizeInBytes=40.0 B, hints=none)") } "statistics for limit" should "decrease" in { val limitedShops = shops.limit(2) val queryExecution = limitedShops.queryExecution.toStringWithStats // Expected plan is: // == Optimized Logical Plan == // GlobalLimit 2, Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none) // +- LocalLimit 2, Statistics(sizeInBytes=120.0 B, hints=none) // +- LocalRelation [id#45, name#46], Statistics(sizeInBytes=120.0 B, hints=none) queryExecution should include("GlobalLimit 2, Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none)") queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)") } "statistics" should "increase when using union operator" in { val newShops = Seq( (11, "Shop_11"), (12, "Shop_12"), (13, "Shop_13"), (14, "Shop_14"), (15, "Shop_15") ).toDF("id", "Name") val allShops = shops.union(newShops) val queryExecution = allShops.queryExecution.toStringWithStats // Expected plan is: // == Optimized Logical Plan == // Union, Statistics(sizeInBytes=240.0 B, hints=none) // :- LocalRelation [id#65, name#66], Statistics(sizeInBytes=120.0 B, hints=none) // +- LocalRelation [id#91, Name#92], Statistics(sizeInBytes=120.0 B, hints=none) queryExecution should include("Union, Statistics(sizeInBytes=240.0 B, hints=none)") queryExecution should include("Statistics(sizeInBytes=120.0 B, hints=none)") }

Statistics are an useful metric used by Spark to decide about some strategies. As we could see in the first section, it uses them to chose the join strategy, detect a correct star schema or even part to limit when 2 datasets are joined. The second sections shown the basic formulas used in the computation. As we could learn, the main formula is based on the number of elements in dataset and the dependency on children physical size. But we could also see that the formula changed according to the node. The differences between these operations were partially proved in learning tests included in the last section.