Dark data and data discovery in Apache Spark SQL

Versions: Apache Spark 2.4.3

Preparing an AWS exam is not only a good way to discover AWS services but also more general concepts. It happened to me when I first heard about dark data during a talk presenting AWS Glue.

In the first section of this post I will define dark data. Before watching the talk, I didn't know this concept and it's important to follow the remaining parts. In the next part, I will show a way that addresses the issues brought by dark data.

Dark data

We live in a "data" era and nobody doubts. The volume of produced data increases every day, it's produced not only by human actions (eg. order on an e-commerce store) but also by machines (eg. server logs). The problem with that is not the volume because we're able to store them with cloud "virtually unlimited" object stores. The problem is with the data itself and its discoverability.

Few years ago the term "dark data" was proposed to describe such types of datasets, ie. the datasets which aren't collected at all, the datasets which are collected but are totally or partially unexploitable. Just after that, different solutions started to emerge to address these issues.

Hence, by "discoverability" I meant not only metadata exposition from a catalog but also the ability to detect and expose new datasets or update already existing datasets with new or modified properties. If you're a lucky user of AWS, you can benefit from their managed service called Glue to fulfill these requirements. If not, I will try to see whether a similar thing can be easily set up with Apache Spark.

Data discoverability and AWS Glue

AWS Glue is a managed service composed of 3 main parts: jobs, crawlers, and schedulers. For data discoverability, we'll focus on the part of the crawler. The crawler is an application built on top of Apache Spark with a source-closed abstraction called DynamicFrames (I wrote about them in DataFrames for analytics - Glue DynamicFrame). The main goal of this component is to populate data catalog, so add new tables or update the schemas of the existing ones.

Data discovery is not in real-time, ie. the crawler doesn't run continuously. Instead, it's more like a batch or micro-batch because you can define different scheduling policies, going from 5 minutes (minimum frequency) to months. You can also execute it on demand and, for instance, start the crawler in an event-based manner by using startCrawler method of the SDK.

Every time a crawler runs, it persists the information about the updated elements, like in the following logs:

BENCHMARK : Running Start Crawl for Crawler test-crawler
BENCHMARK : Classification complete, writing results to database test_crawler_db
INFO : Crawler configured with Configuration
{
    "Version": 1,
    "CrawlerOutput": {
        "Partitions": {
            "AddOrUpdateBehavior": "InheritFromTable"
        }
    }
}
 and SchemaChangePolicy
{
    "UpdateBehavior": "UPDATE_IN_DATABASE",
    "DeleteBehavior": "DEPRECATE_IN_DATABASE"
}
. Note that values in the Configuration override values in the SchemaChangePolicy for S3 Targets.
INFO : Table test_crawler_table in database test_crawler_db has been updated with new schema
INFO : Created partitions with values [[2020, 02, 21, 14], [2020, 02, 21, 15]] for table test_crawler_table in database test_crawler_db
BENCHMARK : Finished writing to Catalog
BENCHMARK : Crawler has finished running and is in state READY

In the log you can see 2 important properties, UpdateBehavior and DeleteBehavior. They define how the crawler should behave when it encounters changes. Regarding the former attribute, it defines what happens when some changes in the schema are detected. With UPDATE_IN_DATABASE policy, the crawler will apply the changes directly to the Data Catalog. The second possibility is LOG and as you can deduce, the crawler will simply log the message about the modifications. Regarding DeleteBehavior, it applies to the tables. If one of the tables is not any more physically stored on S3 (or any other supported data source), you can instruct the crawler to delete this object (DELETE_FROM_DATABASE), simply log a message (LOG) or mark the table as deprecated (DEPRECATE_IN_DATABASE).

How are the schemas created? Well, it depends on the discovered data similarity. If the crawler was in charge of multiple data sources and it detected very similar schemas for the sources that are compatible (eg. same format, same compression), it may create a single table in the data catalog. If you want to enforce this behavior, you can set TableGroupingPolicy property to CombineCompatibleSchemas. In that case, the crawler will only verify data sources compatibility and use all of them to create one common schema in the catalog.

Data catalog and discovery with Apache Spark

Before writing this article I was looking for an Open Source implementation of the service similar to AWS Glue. I found projects like Apache Atlas, Amundsen, or still Hive, but none of them provided automatic data discovery as the crawlers do. (If I'm wrong, please comment, I didn't spend my whole day analyzing their features, so maybe I missed something).

That's why I wanted to check how this crawling could be implemented with Apache Spark. As you know, Apache Spark SQL provides automatic schema inference (I wrote a few posts about that topic under Spark SQL schema tag). To keep things concise, let's focus on semi-structured format like JSON. So the goal of the discovery program is to crawl all files from the specified location and automatically resolve the schema, without any extra processing logic. Let's see this first part of the solution:

val sparkSession: SparkSession = SparkSession.builder()
.master("local[*]").appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", "/tmp/metastore")
.enableHiveSupport().getOrCreate()

val testDataset1 =
"""
  |{"letter": "A", "geo": {"city": "Paris", "country": "France"}}
  |{"number": 2, "geo": "Poland"}
  |{"letter": "C", "number": 3}
""".stripMargin

FileUtils.writeStringToFile(new File("/tmp/input_json1.json"), testDataset1)

val tableName = s"discovered_table${System.currentTimeMillis()}"

// Let's run the discovery for the first time:
val inputDataSchema1 = sparkSession.read.option("samplingRatio", "1.0")
  .json("/tmp/input_json1.json").schema

An important point is to define the sampling ratio to 100%. Otherwise, the framework can skip some files and do not include them in schema resolution.

It included all fields and also found the most common type for "geo" attribute that sometimes is represented as a structure, sometimes as a string. Let's go now to the part about schema creation:

  val catalog = sparkSession.catalog
  import scala.collection.JavaConverters._
  catalog.createTable(tableName, "json", inputDataSchema1, Map.empty[String, String].asJava)

So nothing complicated. I'm simply creating a table from Apache Spark's Catalog class and the schema inferred before. The funny part starts now, when we already have a table and need to merge the schema. StructType of our schema has a method to merge schemas and choose the most common type for a given field. The problem is the visibility of this method (package-private). A simple trick to expose it in a mirrored package, outside Apache Spark lib should help:

package org.apache.spark.sql

import org.apache.spark.sql.types.StructType

object SchemasMerger {

  def merge(schema1: StructType, schema2: StructType): StructType = {
    schema1.merge(schema2)
  }

}

And the merge use looks like:

  FileUtils.writeStringToFile(new File("/tmp/input_json2.json"), testeDataset2)
  val testeDataset2 =
    s"""
      |{"letter": "d", "upper_letter": "D"}
    """.stripMargin
  val inputDataSchema2 = sparkSession.read.option("samplingRatio", "1.0")
    .json("/tmp/input_json2.json").schema
  val existingSchema = sparkSession.sql(s"SELECT * FROM ${tableName}").schema
  val mergedSchema = SchemasMerger.merge(existingSchema, inputDataSchema2)

  sparkSession.sql(s"DROP TABLE IF EXISTS ${tableName}")
  catalog.createTable(tableName, "json", mergedSchema, Map.empty[String, String].asJava)

And now I'm able to manipulate manage data discoverability natively with Apache Spark SQL, as demonstrated in this short movie:

To be honest, when I was thinking about this blog post, I wanted to test Apache Atlas or any other "metadata" tool to imitate the AWS Glue service. However, after a quick analysis of their architectures I said to myself that it wasn't easy to explain the concepts in a single blog post (so maybe in a series the second half of the year?) and that not everything existed. That's why I decided to go with Apache Spark SQL and hopefully found a little bit hacky method to manage the data catalog automatically, at least effort. I'm curious if you have any other tools or Spark implementation to do that more easily. Of course, the problem with that approach is the possibility to visualize and enrich the catalog by humans. But for that Atlas or Amundsen should do the job :)