Tables and Apache Spark

Versions: Apache Spark 3.2.1 https://github.com/bartosz25/spark-playground/tree/master/spark-tables

If you're like me and haven't had an opportunity to work with Spark on Hive, you're probably as confused as I had been about the tables. Hopefully, after reading this blog post you will understand that concept better!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

Why confusing?

Why reasoning about the tables in Apache Spark can be confusing? First, the concept is a bit hidden if you use only the programmatic API (.read("..."), .write("...")) to interact with the framework. Secondly, you might wrongly assume that the tables are only related to Apache Hive and the legacy Hadoop stack. It doesn't sound appealing, does it? Finally, who talks about tables, he automatically implies SQL. Although SQL is a great language to solve many problems in the data engineering, it probably still looks less attractive than Python or Scala.

Nonetheless, with more and more popular ACID file formats, it was crucial for me to shed some light on this table-related part of Apache Spark.

Tables types

Apache Spark has 2 types of tables, internal and external. Although you can also find a different terminology calling them managed and unmanaged, I won't use it in this article for simplicity.

So, what's the difference between these internal and external tables? Mainly the location of the data. Apache Spark manages the internal tables fully. Put another way, the framework is responsible for storing their data and metadata in the location defined in the spark.sql.warehouse.dir property. The scope of responsibility is different for the external tables where Apache Spark only deals with the metadata.

Therefore, you can already see the first implication. Yes, you remove data and metadata if you drop an internal table. Removing an external table impacts the metadata only.

Tables creation

Technically speaking, internal and external tables creation is different. An external table requires the LOCATION parameter to be defined in the creation statement. If you don't add it, you will get an error like in the snippet below:

Exception in thread "main" org.apache.spark.sql.AnalysisException: CREATE EXTERNAL TABLE must be accompanied by LOCATION
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.org$apache$spark$sql$catalyst$analysis$ResolveSessionCatalog$$buildCatalogTable(ResolveSessionCatalog.scala:681)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:276)
    at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:48)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)

Optionnally, you can use a CREATE EXTERNAL TABLE instead of the simple CREATE TABLE. It's a way to protect yourself against creating an internal table instead of in place of the external table when you forget to specify the LOCATION parameter.

The demo just below summarizes all the aspects presented so far:

Classes

Internally, the method creating the tables is SessionCatalog#createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean, validateLocation: Boolean = true). It's called for CREATE TABLE and CREATE TABLE LIKE statements. There is a difference, though. The latter relies on the tableDefinition from the catalog:

case class CreateTableLikeCommand( // ...
override def run(sparkSession: SparkSession): Seq[Row] = {
  val catalog = sparkSession.sessionState.catalog
  val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
// ...
    val newTableSchema = CharVarcharUtils.getRawSchema(sourceTableDesc.schema)
    val newTableDesc =
      CatalogTable(
        identifier = targetTable,
        tableType = tblType,
        storage = newStorage,
        schema = newTableSchema,
        provider = newProvider,
        partitionColumnNames = sourceTableDesc.partitionColumnNames,
        bucketSpec = sourceTableDesc.bucketSpec,
        properties = properties,
        tracksPartitionsInCatalog = sourceTableDesc.tracksPartitionsInCatalog)

    catalog.createTable(newTableDesc, ifNotExists)
    Seq.empty[Row]
  }

However, the createTable method doesn't create the table directly. It delegates this responsibility to an ExternalCatalog instance. The used implementation is wrapped by the ExternalCatalogWithListener class that triggers events for the catalog interactions. In our case, it triggers CreateTablePreEvent and CreateTableEvent events before and after the table creation:

  override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    val db = tableDefinition.database
    val name = tableDefinition.identifier.table
    val tableDefinitionWithVersion =
      tableDefinition.copy(createVersion = org.apache.spark.SPARK_VERSION)
    postToAll(CreateTablePreEvent(db, name))
    delegate.createTable(tableDefinitionWithVersion, ignoreIfExists)
    postToAll(CreateTableEvent(db, name))
  }

As you can notice in the snippet, there is yet another delegation. This time for the real table creation. The mysterious "delegate" variable references an instance of ExternalCatalog which in our example is HiveExternalCatalog.

Bad news. The internal vs. external table distinction didn't happen in any of the aforementioned snippets. In fact, Apache Spark has a special object with all supported table types that are used in various places in the code to correctly classify the CatalogTable instance passed to the final createTable method:

object CatalogTableType {
  val EXTERNAL = new CatalogTableType("EXTERNAL")
  val MANAGED = new CatalogTableType("MANAGED")
  val VIEW = new CatalogTableType("VIEW")

  val tableTypes = Seq(EXTERNAL, MANAGED, VIEW)
}

One of the places referencing this CatalogTableType is DataFrameWriter, called for the saveAsTable(...). As you can see, it checks the LOCATION parameter to mark the table as internal or external:

  private def createTable(tableIdent: TableIdentifier): Unit = {
    val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
    val tableType = if (storage.locationUri.isDefined) {
      CatalogTableType.EXTERNAL
    } else {
      CatalogTableType.MANAGED
    }

    val tableDesc = CatalogTable(
      identifier = tableIdent,
      tableType = tableType,
      storage = storage,
      schema = new StructType,
      provider = Some(source),
      partitionColumnNames = partitioningColumns.getOrElse(Nil),
      bucketSpec = getBucketSpec)

    runCommand(df.sparkSession, "saveAsTable")(
      CreateTable(tableDesc, mode, Some(df.logicalPlan)))
  }

These INTERNAL/EXTERNAL parameters usage depends on the underlying catalog. For Hive, Apache Spark does 2 things. First, it converts the tableType to a valid Hive's TableType. Second, it also adds an "EXTERNAL" property set to "TRUE" if the tableType is CatalogTableType.EXTERNAL. Thanks to these 2 changes, the Hive client knows how to write the table in the catalog:

private[hive] class HiveClientImpl(
// ...

  override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
    verifyColumnDataType(table.dataSchema)
    client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
  }

  def toHiveTable(table: CatalogTable, userName: Option[String] = None): HiveTable = {
    val hiveTable = new HiveTable(table.database, table.identifier.table)
    hiveTable.setTableType(toHiveTableType(table.tableType))
    // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
    // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
    // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
    if (table.tableType == CatalogTableType.EXTERNAL) {
      hiveTable.setProperty("EXTERNAL", "TRUE")
    }
// ..

def toHiveTableType(catalogTableType: CatalogTableType): HiveTableType = {
  catalogTableType match {
    case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE
    case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE
    case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
    case t =>
      throw new IllegalArgumentException(
        s"Unknown table type is found at toHiveTableType: $t")
  }
}

Tables in Apache Spark are a great way to share access to a dataset. Depending on the dataset character (open vs. closed to the table clients only), you can manage only the metadata for the external tables or the metadata with the data for the internal tables. Their creation statements use different command (CREATE TABLE vs CREATE EXTERNAL TABLE) and parameters (LOCATION for the external table).

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!