Data validation frameworks - Deequ and Apache Griffin overview

Versions: Deequ 1.0.2, Apache Griffin 0.5.0

Poor data quality is the reason for big pains of data workers. Data engineers need often to deal with JSON inconsistent schemes, data analysts have to figure out dataset issues to avoid biased reportings whereas data scientists have to spend a big amount of time preparing data for training instead of dedicating this time on model optimization. That's why having a good tool to control data quality is very important.

Even though on my blog I used to write about Cerberus, there are few other data validation frameworks that you can use. In this post, I will cover 2 of available solutions, Deequ and Apache Griffin. Why these 2? First, Deequ is in my backlog for several months and it's time to see what it can do. Second, Apache Griffin, even though I discovered that after analyzing the tool, it brings a little bit different paradigm of validation. I omit here Cerberus because I've already covered it in different articles (Validating JSON with Apache Spark and Cerberus, Extended JSON validation with Cerberus - error definition and normalization).

Deequ

Deequ is a project backed by AWS which, according to the documentation, is internally used in Amazon to validate the data quality. Among its main characteristics, you will find:

Since it's quite hard to illustrate all of these uses cases in one blog post, I will focus on 3 of them, constraints suggestion, anomaly detection and validation rules:

class DeequTest extends FlatSpec with Matchers {

  private val testSparkSession: SparkSession = SparkSession.builder().appName("Deequ test").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._
  val orders = Seq(
    (1, "order#1", -3.9d, "CONFIRMED"),
    (2, "order#2", 3.9d, "CONFIRMED"),
    (3, "order#3", 23.9d, "CONFIRMED"),
    (4, "order#4", 34.9d, "PENDING"),
    (1, "order#1", -3.9d, "CONFIRMED"),
    (5, "order#5", 31.9d, "DELETED")
  ).toDF("id", "label", "amount", "status")

  behavior of "Deequ"

  it should "suggest constraints based on the data" in {
    val suggestionResult = ConstraintSuggestionRunner()
      .onData(orders)
      .addConstraintRules(Rules.DEFAULT)
      .run()

    val idColumnProfile = suggestionResult.columnProfiles("id")
    idColumnProfile shouldBe a [NumericColumnProfile]
    idColumnProfile.asInstanceOf[NumericColumnProfile].completeness shouldEqual 1d
    idColumnProfile.asInstanceOf[NumericColumnProfile].approximateNumDistinctValues shouldEqual 5
    idColumnProfile.asInstanceOf[NumericColumnProfile].minimum shouldEqual Some(1)
    idColumnProfile.asInstanceOf[NumericColumnProfile].maximum shouldEqual Some(5)
    val labelColumnProfile = suggestionResult.columnProfiles("label")
    labelColumnProfile shouldBe a [StandardColumnProfile]
    labelColumnProfile.asInstanceOf[StandardColumnProfile].typeCounts shouldEqual Map("Boolean" -> 0,
      "Fractional" -> 0, "Integral" -> 0, "Unknown" -> 0, "String" -> 6)
    val statusColumnProfile = suggestionResult.columnProfiles("status")
    statusColumnProfile shouldBe a [StandardColumnProfile]
    val statusHistogram = statusColumnProfile.asInstanceOf[StandardColumnProfile].histogram
    statusHistogram shouldBe defined
    statusHistogram.get.values shouldEqual Map(
      "CONFIRMED" -> DistributionValue(4,0.6666666666666666),
      "PENDING" -> DistributionValue(1,0.16666666666666666),
      "DELETED" -> DistributionValue(1,0.16666666666666666)
    )
  }

  it should "detect an error for too big dataset" in {
    val metricsRepository = new InMemoryMetricsRepository()
    // Setup measures for the first time
    val nowKey = ResultKey(System.currentTimeMillis())
    VerificationSuite()
      .onData(orders)
      .addCheck(
        Check(CheckLevel.Error, "Ensure data quality")
          .hasSize(itemsCount => itemsCount == 5, Some("<SIZE>"))
      )
      .useRepository(metricsRepository)
      .saveOrAppendResult(nowKey)
      .run()

    // Here we try with 14 orders because we want to see the anomaly detection
    // fail because of the multiplied number of rows
    val ordersOneMinuteLater = Seq(
      (1, "order#1", -3.9d, "CONFIRMED"),
      (2, "order#2", 3.9d, "CONFIRMED"),
      (3, "order#3", 23.9d, "CONFIRMED"),
      (4, "order#4", 34.9d, "PENDING"),
      (1, "order#1", -3.9d, "CONFIRMED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED")
    ).toDF("id", "label", "amount", "status")
    val ordersOneMinuteLaterKey = ResultKey(System.currentTimeMillis())
    val verificationResultOneMinuteLater = VerificationSuite()
      .onData(ordersOneMinuteLater)
      .useRepository(metricsRepository)
      .saveOrAppendResult(ordersOneMinuteLaterKey)
      // We expect at most 2x increase
      .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease = Some(2)), Size())
      .run()

    verificationResultOneMinuteLater.status shouldEqual CheckStatus.Warning
    // You can also visualize the metrics from the repository
    metricsRepository
      .load()
      .forAnalyzers(Seq(Size()))
      .getSuccessMetricsAsDataFrame(testSparkSession)
      .show()
    // +-------+--------+----+-----+-------------+
    //| entity|instance|name|value| dataset_date|
    //+-------+--------+----+-----+-------------+
    //|Dataset|       *|Size|  6.0|1579069527000|
    //|Dataset|       *|Size| 14.0|1579069527611|
    //+-------+--------+----+-----+-------------+
  }

  it should "apply few validation rules" in {
    val nowKey = ResultKey(System.currentTimeMillis())
    val verificationResult = VerificationSuite()
      .onData(orders)
      .addCheck(
        Check(CheckLevel.Error, "Ensure data quality")
          // Some<SIZE> --> custom prefix that you can add to the validation result message
          .hasSize(itemsCount => itemsCount == 5, Some("<SIZE>"))
          // Ensure uniqueness of the order id
          .isComplete("id").isUnique("id")
          // Ensure completness (NOT NULL) of productName which is missing in the dataset!
          .isComplete("productName")
          // Ensure all statuses are contained in the array
          .isContainedIn("status", Array("CONFIRMED", "DELETED", "PENDING"))
          // Ensure that the max amount is positive and at most 100
          .isNonNegative("amount").hasMax("amount", amount => amount == 100d)
      )
      .run()
    verificationResult.status shouldEqual CheckStatus.Error

    val resultsForAllConstraints = verificationResult.checkResults
      .flatMap { case (_, checkResult) => checkResult.constraintResults }
    val successfulConstraints = resultsForAllConstraints.filter(result => result.status == ConstraintStatus.Success)
      .map(result => result.constraint.asInstanceOf[NamedConstraint].toString())
    successfulConstraints should have size 2
      successfulConstraints should contain allOf(
      "CompletenessConstraint(Completeness(id,None))",
      "ComplianceConstraint(Compliance(status contained in CONFIRMED,DELETED,PENDING,`status` IS " +
        "NULL OR `status` IN ('CONFIRMED','DELETED','PENDING'),None))"
    )

    val failedConstraints = resultsForAllConstraints.filter(result => result.status != ConstraintStatus.Success)
        .map(result => result.constraint.asInstanceOf[NamedConstraint].toString())
    failedConstraints should have size 5
    failedConstraints should contain allOf(
      "SizeConstraint(Size(None))",
      "UniquenessConstraint(Uniqueness(List(id)))",
      "CompletenessConstraint(Completeness(productName,None))",
      "ComplianceConstraint(Compliance(amount is non-negative,COALESCE(amount, 0.0) >= 0,None))",
      "MaximumConstraint(Maximum(amount,None))"
    )
  }
 
}

Apache Griffin

The second framework I'd like to cover here is Apache Griffin. As far as I understand, it works differently than Deequ because it's more like a platform that you configure rather than the code you define. To make it work, you have to define an environment file with the output sinks and a data quality file with the inputs and validation rules. Once defined, you can launch org.apache.griffin.measure.Application directly from spark-submit.

In a low-level view, Griffin is based on 3 main layers called define, measure and analyze. In the former one you define the scope of your validation, so all rules, thresholds and goals. The next level takes these definitions and applies them on one of the available data sources (Hadoop, RDBMS, Kafka). Measures layer generates metrics that are later stored in the metrics repository from the analyze layer. It's the exposition part used to visualize and eventually fine-tune the measures. A nice thing of Griffin is that it offers a unified model for batch and streaming data validation.

In my test I will use a standalone Apache Spark 2.2.0 distribution. To run Griffin, you can also use Docker images from Github but launching it standalone was a faster option for me. After downloading measure JAR from Maven central repository I generated 2 datasets with this code:

object GriffinEmptyDataGenerator extends App {

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Generate test data for Griffin test")
    .config("spark.sql.shuffle.partitions", "1")
    .master("local[*]").getOrCreate()
  import sparkSession.implicits._

  Seq(
    GriffinTestDataset(1L, None), GriffinTestDataset(2L, Some("test")),
    GriffinTestDataset(3L, Some("test 2")), GriffinTestDataset(4L, None),
    GriffinTestDataset(5L, Some("test 3")), GriffinTestDataset(5L, Some("test 3")), GriffinTestDataset(6L, None)
  ).toDF().coalesce(1).write.mode(SaveMode.Overwrite).format("avro").save("/tmp/source")
  Seq(
    GriffinTestDataset(1L, None), GriffinTestDataset(2L, Some("test")),
    GriffinTestDataset(3L, Some("test 2")), GriffinTestDataset(4L, None),
    GriffinTestDataset(5L, Some("test 3"))
  ).toDF().coalesce(1).write.mode(SaveMode.Overwrite).format("avro").save("/tmp/target")

}

case class GriffinTestDataset(id: Long, label: Option[String])

After that I noted the names of both files and put them into data quality file created that way (inspired from official repository):

{
  "name": "dist_batch",
  "process.type": "batch",
  "data.sources": [
    {
      "name": "source",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "/tmp/source/part-00000-d2f17312-11e2-4e23-854a-c0e004d160e4-c000.avro"
          }
        }
      ]
    },
    {
      "name": "target",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "/tmp/target/part-00000-b18c5621-2458-46e8-8958-e12488c5cd42-c000.avro"
          }
        }
      ]
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "distinct",
        "out.dataframe.name": "dist",
        "rule": "id",
        "details": {
          "source": "source",
          "target": "target",
          "total": "total",
          "distinct": "distinct",
          "dup": "dup",
          "num": "num",
          "duplication.array": "dup"
        },
        "out":[
          {
            "type": "metric",
            "name": "distinct"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE"]
}

As you can see, I want to check the duplicates. The last files you have to create defines the environment and it looks like this:

{
  "spark": {
    "log.level": "INFO",
    "config": {
      "spark.master": "local[*]"
    }
  },
  "sinks": [
    {
      "type": "console"
    }
  ]
}

After executing Griffin with spark-submit you should see few measures on the logs:

bartosz:~/programming/spark-2.2.0-bin-hadoop2.7/bin$ ./spark-submit --class org.apache.griffin.measure.Application --master local[*]  --driver-memory 1g --executor-memory 1g --num-executors 2 /home/bartosz/workspace/griffin_test/measure-0.5.0.jar /home/bartosz/workspace/griffin_test/env.json /home/bartosz/workspace/griffin_test/data_quality_csv.json

The measures you should see are:

INFO DAGScheduler: Job 5 finished: collect at MetricWriteStep.scala:79, took 2.472455 s
dist_batch [1579100143771] metrics:
{"name":"dist_batch","tmst":1579068286000,"value":{"total":7,"distinct":6,"dup":[{"dup":1,"num":1}]},"applicationId":"local-1579068286000"}
[1579068286000] 1579068286100: process using time: 12177 ms

In this article you can see that Cerberus, so often mentioned on this blog, is not the single one data quality framework. Deequ and Apache Griffin are 2 alternatives. The former prefers Scala DSL executed on top of Apache Spark. Regarding Griffin, it prefers configuration over code and IMHO, it's a little bit harder to start. Initially I wanted to make my test on CSV files which according to the Github's doc are supported. Unfortunately, I didn't succeed to launch them because of "data source not found" error. It only worked when I took Avro and the example defined in /resources directory on the project. But do not consider it as a guidance. It's only a personal experience after only a few hours of playing with both solutions. Try them by yourself and make your own opinion on them!