Writing custom external catalog listeners in Apache Spark SQL

Versions: Apache Spark 2.4.3 https://github.com/bartosz25/spark-...ternalCatalogWithListenerTest.scala

When I was writing posts about Apache Spark SQL customization through extensions, I found a method to define custom catalog listeners. Since it was my first contact with this, before playing with it, I decided to discover the feature.

The post will begin with an explanation of catalogs in data processing context. In the second part, it will focus on Hive catalog which is the one of natively supported in Apache Spark from the beginning. The 3rd part will cover implementation details of external catalog listeners while the last section will give an example of catalog listeners.

Data catalogs

In simple words, a data catalog is the place storing the information about your data. By the information I want to say the location of the data, comments, statistics or schema. In more complete definition, we can say that data catalog:

To wrap-up, a data catalog is a place defining characteristics of datasets where end users can go to understand and collaborate on the data. Ideally, the datasets are enriched in an automatic way, either by crawlers or Machine Learning models.

Hive catalog and Apache Spark SQL

Why we'll focus here on Hive? It's because Hive was the first project from Hadoop family, so de facto the first modern and open source Big Data platform, which introduced the management of metadata. Hive stores all information about tables, relations, partitions and schemas in a relational database called Hive Metastore. It works in 3 different modes:

But all of this is related to Hive. If you remember the evolution of Hadoop ecosystem, with time more and more tools started to use it. And it was possible thanks to an extra layer on top of metastore service called HCatalog. It provides an API that can be used by 3rd party tools to manage the metastore catalog.

Fine, but where here is Apache Spark? Apache Spark will use catalog to perform many different operations. For instance, for any operation related to a structure of the table, the framework will call underlying catalog. Among different catalog implementations you will find ExternalCatalogWithListener, HiveExternalCatalog and InMemoryCatalog. I will focus on the former one in the next section, so let's briefly discuss 2 remaining ones here.

HiveExternalCatalog is the catalog used when Hive support is enabled, i.e. when during SparkSession construction you call enableHiveSupport. It interacts with Hive metastore through Apache Spark's HiveClient. HiveClient is just a wrapper on top of Hive's metastore from Hadoop package (org.apache.hadoop.hive.ql.metadata). The wrapper's role consists mainly on retrieving data from Hive metastore and converting it into corresponding classes in Apache Spark. Aside from that, the implementation of this interface (HiveClientImpl) has other interesting things like Shims but I will cover them in one of the next posts.

Regarding InMemoryCatalog, it's reserved for testing and exploration purposes and shouldn't be used for production workloads because of its ephemeral (in-memory) character.

External catalog listeners

Apache Spark 2.4.0 transformed an abstract ExternalCatalog class into a trait and brought ExternalCatalogWithListener class implementing event-based logic. The main difference of this implementation, that you can see directly in the name, is it's event-based character. ExternalCatalogWithListener integrates with Apache Spark's listener bus which is responsible for sending the data to registered listeners. And you can see that in SharedState, where the catalog is initialized:

    // Wrap to provide catalog events
    val wrapped = new ExternalCatalogWithListener(externalCatalog)

    // Make sure we propagate external catalog events to the spark listener bus
    wrapped.addListener(new ExternalCatalogEventListener {
      override def onEvent(event: ExternalCatalogEvent): Unit = {
        sparkContext.listenerBus.post(event)
      }
    })

As you can notice, our catalog is a wrapper. It means that it delegates all operations on the metastore and notifies the listeners, nothing more, nothing less. And how the wrapped class is resolved? The resolution comes from an internal configuration property called spark.sql.catalogImplementation. It's set to one of 2 previously covered catalogs, HiveExternalCatalog or InMemoryCatalog. The former one is used when Hive support is enabled whereas the latter one is Apache Spark's default.

Example

To see catalog listeners in action we need to register a listener in listener bus with org.apache.spark.SparkContext#addSparkListener(SparkListenerInterface) method. In the example I will simply create a new table and ensure that I'm getting 2 events, one before and another one after CREATE TABLE query execution. To understand why these 2 particular events we must check ExternalCatalogWithListener's createTable method implementation:

  override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    val db = tableDefinition.database
    val name = tableDefinition.identifier.table
    val tableDefinitionWithVersion =
      tableDefinition.copy(createVersion = org.apache.spark.SPARK_VERSION)
    postToAll(CreateTablePreEvent(db, name))
    delegate.createTable(tableDefinitionWithVersion, ignoreIfExists)
    postToAll(CreateTableEvent(db, name))
  }

Let's check now how to add a listener for these events:

  "create table events" should "be caught by the listener" in {
    val catalogEvents = new scala.collection.mutable.ListBuffer[ExternalCatalogEvent]()
    TestedSparkSession.sparkContext.addSparkListener(new SparkListener {
      override def onOtherEvent(event: SparkListenerEvent): Unit = {
        event match {
          case externalCatalogEvent: ExternalCatalogEvent => catalogEvents.append(externalCatalogEvent)
          case _ => {}
        }
      }
    })
    val tableName = s"orders${System.currentTimeMillis()}"
    val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1")).toDF("order_id", "user_id")

    orders.write.mode(SaveMode.Overwrite).bucketBy(2, "user_id").saveAsTable(tableName)


    val createTablePreEvent = catalogEvents.collectFirst{
      case event if event.isInstanceOf[CreateTablePreEvent] => event.asInstanceOf[CreateTablePreEvent]
    }
    createTablePreEvent shouldBe defined
    createTablePreEvent.get shouldEqual CreateTablePreEvent("default", tableName)
    val createTableEvent = catalogEvents.collectFirst{
      case event if event.isInstanceOf[CreateTableEvent] => event.asInstanceOf[CreateTableEvent]
    }
    createTableEvent shouldBe defined
    createTableEvent.get shouldEqual CreateTableEvent("default", tableName)
  }

Data catalog is not probably something you'll explore at the very beginning of your Apache Spark adventure. But it's worth attention because it will facilitate a lot of tedious tasks like automatic data discovery. Apache Spark comes with 2 implementations of the catalog, an in-memory which is mostly used for debugging and local development, and more production-ready Hive metastore. Both are wrapped by an external catalog with built-in listener which can help to catch catalog-related events.