Data validation frameworks - Deequ and Apache Griffin overview

on waitingforcode.com

Data validation frameworks - Deequ and Apache Griffin overview

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
Poor data quality is the reason for big pains of data workers. Data engineers need often to deal with JSON inconsistent schemes, data analysts have to figure out dataset issues to avoid biased reportings whereas data scientists have to spend a big amount of time preparing data for training instead of dedicating this time on model optimization. That's why having a good tool to control data quality is very important.

Even though on my blog I used to write about Cerberus, there are few other data validation frameworks that you can use. In this post, I will cover 2 of available solutions, Deequ and Apache Griffin. Why these 2? First, Deequ is in my backlog for several months and it's time to see what it can do. Second, Apache Griffin, even though I discovered that after analyzing the tool, it brings a little bit different paradigm of validation. I omit here Cerberus because I've already covered it in different articles (Validating JSON with Apache Spark and Cerberus, Extended JSON validation with Cerberus - error definition and normalization).

Deequ

Deequ is a project backed by AWS which, according to the documentation, is internally used in Amazon to validate the data quality. Among its main characteristics, you will find:

  • Apache Spark support - it's built on top of Apache Spark, so you can use it with any data source supported by the framework!
  • anomaly detection - sometimes you will need to compare your dataset from now with the previously analyzed dataset, for instance, to detect data regressions in terms of dataset size. Deequ comes with a native implementation of this feature.
  • global and unitary checks - you can verify the whole dataset attributes like the number of rows as well as every column with a fine-grained rule.
  • metrics storage - Deequ has a concept of a repository where you can output generated metrics. Thanks to the repository you can query the metrics and visualize them on a dashboard.
  • dashboard - it's not natively integrated with the framework. You will need to add an extra layer.
  • evolving datasets - when you need to measure metrics over an evolving dataset, for example increasing in size every day, you can use a state store to persist the measures on an HDFS-compatible file system (object storage) or simply in memory.
  • constraints suggestion - that's one of the features that surprised me the most. Deequ can analyze your whole dataset or only its part, and suggest you the validation constraints from there.
  • profiling - last but not least, data profiling is also an interesting option of Deequ. The profiling will give you information about one specific column, like its completeness (missing values), the number of distinct values or data type.

Since it's quite hard to illustrate all of these uses cases in one blog post, I will focus on 3 of them, constraints suggestion, anomaly detection and validation rules:

class DeequTest extends FlatSpec with Matchers {

  private val testSparkSession: SparkSession = SparkSession.builder().appName("Deequ test").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._
  val orders = Seq(
    (1, "order#1", -3.9d, "CONFIRMED"),
    (2, "order#2", 3.9d, "CONFIRMED"),
    (3, "order#3", 23.9d, "CONFIRMED"),
    (4, "order#4", 34.9d, "PENDING"),
    (1, "order#1", -3.9d, "CONFIRMED"),
    (5, "order#5", 31.9d, "DELETED")
  ).toDF("id", "label", "amount", "status")

  behavior of "Deequ"

  it should "suggest constraints based on the data" in {
    val suggestionResult = ConstraintSuggestionRunner()
      .onData(orders)
      .addConstraintRules(Rules.DEFAULT)
      .run()

    val idColumnProfile = suggestionResult.columnProfiles("id")
    idColumnProfile shouldBe a [NumericColumnProfile]
    idColumnProfile.asInstanceOf[NumericColumnProfile].completeness shouldEqual 1d
    idColumnProfile.asInstanceOf[NumericColumnProfile].approximateNumDistinctValues shouldEqual 5
    idColumnProfile.asInstanceOf[NumericColumnProfile].minimum shouldEqual Some(1)
    idColumnProfile.asInstanceOf[NumericColumnProfile].maximum shouldEqual Some(5)
    val labelColumnProfile = suggestionResult.columnProfiles("label")
    labelColumnProfile shouldBe a [StandardColumnProfile]
    labelColumnProfile.asInstanceOf[StandardColumnProfile].typeCounts shouldEqual Map("Boolean" -> 0,
      "Fractional" -> 0, "Integral" -> 0, "Unknown" -> 0, "String" -> 6)
    val statusColumnProfile = suggestionResult.columnProfiles("status")
    statusColumnProfile shouldBe a [StandardColumnProfile]
    val statusHistogram = statusColumnProfile.asInstanceOf[StandardColumnProfile].histogram
    statusHistogram shouldBe defined
    statusHistogram.get.values shouldEqual Map(
      "CONFIRMED" -> DistributionValue(4,0.6666666666666666),
      "PENDING" -> DistributionValue(1,0.16666666666666666),
      "DELETED" -> DistributionValue(1,0.16666666666666666)
    )
  }

  it should "detect an error for too big dataset" in {
    val metricsRepository = new InMemoryMetricsRepository()
    // Setup measures for the first time
    val nowKey = ResultKey(System.currentTimeMillis())
    VerificationSuite()
      .onData(orders)
      .addCheck(
        Check(CheckLevel.Error, "Ensure data quality")
          .hasSize(itemsCount => itemsCount == 5, Some("<SIZE>"))
      )
      .useRepository(metricsRepository)
      .saveOrAppendResult(nowKey)
      .run()

    // Here we try with 14 orders because we want to see the anomaly detection
    // fail because of the multiplied number of rows
    val ordersOneMinuteLater = Seq(
      (1, "order#1", -3.9d, "CONFIRMED"),
      (2, "order#2", 3.9d, "CONFIRMED"),
      (3, "order#3", 23.9d, "CONFIRMED"),
      (4, "order#4", 34.9d, "PENDING"),
      (1, "order#1", -3.9d, "CONFIRMED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED"),
      (5, "order#5", 31.9d, "DELETED")
    ).toDF("id", "label", "amount", "status")
    val ordersOneMinuteLaterKey = ResultKey(System.currentTimeMillis())
    val verificationResultOneMinuteLater = VerificationSuite()
      .onData(ordersOneMinuteLater)
      .useRepository(metricsRepository)
      .saveOrAppendResult(ordersOneMinuteLaterKey)
      // We expect at most 2x increase
      .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease = Some(2)), Size())
      .run()

    verificationResultOneMinuteLater.status shouldEqual CheckStatus.Warning
    // You can also visualize the metrics from the repository
    metricsRepository
      .load()
      .forAnalyzers(Seq(Size()))
      .getSuccessMetricsAsDataFrame(testSparkSession)
      .show()
    // +-------+--------+----+-----+-------------+
    //| entity|instance|name|value| dataset_date|
    //+-------+--------+----+-----+-------------+
    //|Dataset|       *|Size|  6.0|1579069527000|
    //|Dataset|       *|Size| 14.0|1579069527611|
    //+-------+--------+----+-----+-------------+
  }

  it should "apply few validation rules" in {
    val nowKey = ResultKey(System.currentTimeMillis())
    val verificationResult = VerificationSuite()
      .onData(orders)
      .addCheck(
        Check(CheckLevel.Error, "Ensure data quality")
          // Some<SIZE> --> custom prefix that you can add to the validation result message
          .hasSize(itemsCount => itemsCount == 5, Some("<SIZE>"))
          // Ensure uniqueness of the order id
          .isComplete("id").isUnique("id")
          // Ensure completness (NOT NULL) of productName which is missing in the dataset!
          .isComplete("productName")
          // Ensure all statuses are contained in the array
          .isContainedIn("status", Array("CONFIRMED", "DELETED", "PENDING"))
          // Ensure that the max amount is positive and at most 100
          .isNonNegative("amount").hasMax("amount", amount => amount == 100d)
      )
      .run()
    verificationResult.status shouldEqual CheckStatus.Error

    val resultsForAllConstraints = verificationResult.checkResults
      .flatMap { case (_, checkResult) => checkResult.constraintResults }
    val successfulConstraints = resultsForAllConstraints.filter(result => result.status == ConstraintStatus.Success)
      .map(result => result.constraint.asInstanceOf[NamedConstraint].toString())
    successfulConstraints should have size 2
      successfulConstraints should contain allOf(
      "CompletenessConstraint(Completeness(id,None))",
      "ComplianceConstraint(Compliance(status contained in CONFIRMED,DELETED,PENDING,`status` IS " +
        "NULL OR `status` IN ('CONFIRMED','DELETED','PENDING'),None))"
    )

    val failedConstraints = resultsForAllConstraints.filter(result => result.status != ConstraintStatus.Success)
        .map(result => result.constraint.asInstanceOf[NamedConstraint].toString())
    failedConstraints should have size 5
    failedConstraints should contain allOf(
      "SizeConstraint(Size(None))",
      "UniquenessConstraint(Uniqueness(List(id)))",
      "CompletenessConstraint(Completeness(productName,None))",
      "ComplianceConstraint(Compliance(amount is non-negative,COALESCE(amount, 0.0) >= 0,None))",
      "MaximumConstraint(Maximum(amount,None))"
    )
  }
 
}

Apache Griffin

The second framework I'd like to cover here is Apache Griffin. As far as I understand, it works differently than Deequ because it's more like a platform that you configure rather than the code you define. To make it work, you have to define an environment file with the output sinks and a data quality file with the inputs and validation rules. Once defined, you can launch org.apache.griffin.measure.Application directly from spark-submit.

In a low-level view, Griffin is based on 3 main layers called define, measure and analyze. In the former one you define the scope of your validation, so all rules, thresholds and goals. The next level takes these definitions and applies them on one of the available data sources (Hadoop, RDBMS, Kafka). Measures layer generates metrics that are later stored in the metrics repository from the analyze layer. It's the exposition part used to visualize and eventually fine-tune the measures. A nice thing of Griffin is that it offers a unified model for batch and streaming data validation.

In my test I will use a standalone Apache Spark 2.2.0 distribution. To run Griffin, you can also use Docker images from Github but launching it standalone was a faster option for me. After downloading measure JAR from Maven central repository I generated 2 datasets with this code:

object GriffinEmptyDataGenerator extends App {

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Generate test data for Griffin test")
    .config("spark.sql.shuffle.partitions", "1")
    .master("local[*]").getOrCreate()
  import sparkSession.implicits._

  Seq(
    GriffinTestDataset(1L, None), GriffinTestDataset(2L, Some("test")),
    GriffinTestDataset(3L, Some("test 2")), GriffinTestDataset(4L, None),
    GriffinTestDataset(5L, Some("test 3")), GriffinTestDataset(5L, Some("test 3")), GriffinTestDataset(6L, None)
  ).toDF().coalesce(1).write.mode(SaveMode.Overwrite).format("avro").save("/tmp/source")
  Seq(
    GriffinTestDataset(1L, None), GriffinTestDataset(2L, Some("test")),
    GriffinTestDataset(3L, Some("test 2")), GriffinTestDataset(4L, None),
    GriffinTestDataset(5L, Some("test 3"))
  ).toDF().coalesce(1).write.mode(SaveMode.Overwrite).format("avro").save("/tmp/target")

}

case class GriffinTestDataset(id: Long, label: Option[String])

After that I noted the names of both files and put them into data quality file created that way (inspired from official repository):

{
  "name": "dist_batch",
  "process.type": "batch",
  "data.sources": [
    {
      "name": "source",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "/tmp/source/part-00000-d2f17312-11e2-4e23-854a-c0e004d160e4-c000.avro"
          }
        }
      ]
    },
    {
      "name": "target",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "/tmp/target/part-00000-b18c5621-2458-46e8-8958-e12488c5cd42-c000.avro"
          }
        }
      ]
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "distinct",
        "out.dataframe.name": "dist",
        "rule": "id",
        "details": {
          "source": "source",
          "target": "target",
          "total": "total",
          "distinct": "distinct",
          "dup": "dup",
          "num": "num",
          "duplication.array": "dup"
        },
        "out":[
          {
            "type": "metric",
            "name": "distinct"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE"]
}

As you can see, I want to check the duplicates. The last files you have to create defines the environment and it looks like this:

{
  "spark": {
    "log.level": "INFO",
    "config": {
      "spark.master": "local[*]"
    }
  },
  "sinks": [
    {
      "type": "console"
    }
  ]
}

After executing Griffin with spark-submit you should see few measures on the logs:

bartosz:~/programming/spark-2.2.0-bin-hadoop2.7/bin$ ./spark-submit --class org.apache.griffin.measure.Application --master local[*]  --driver-memory 1g --executor-memory 1g --num-executors 2 /home/bartosz/workspace/griffin_test/measure-0.5.0.jar /home/bartosz/workspace/griffin_test/env.json /home/bartosz/workspace/griffin_test/data_quality_csv.json

The measures you should see are:

INFO DAGScheduler: Job 5 finished: collect at MetricWriteStep.scala:79, took 2.472455 s
dist_batch [1579100143771] metrics:
{"name":"dist_batch","tmst":1579068286000,"value":{"total":7,"distinct":6,"dup":[{"dup":1,"num":1}]},"applicationId":"local-1579068286000"}
[1579068286000] 1579068286100: process using time: 12177 ms

In this article you can see that Cerberus, so often mentioned on this blog, is not the single one data quality framework. Deequ and Apache Griffin are 2 alternatives. The former prefers Scala DSL executed on top of Apache Spark. Regarding Griffin, it prefers configuration over code and IMHO, it's a little bit harder to start. Initially I wanted to make my test on CSV files which according to the Github's doc are supported. Unfortunately, I didn't succeed to launch them because of "data source not found" error. It only worked when I took Avro and the example defined in /resources directory on the project. But do not consider it as a guidance. It's only a personal experience after only a few hours of playing with both solutions. Try them by yourself and make your own opinion on them!

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: