Validating JSON with Apache Spark and Cerberus

on waitingforcode.com

Validating JSON with Apache Spark and Cerberus

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
In one of recent Meetups I heard that one of the most difficult data engineering tasks is ensuring good data quality. I'm more than agree with that statement and that's the reason why in this post I will share one of solutions to detect data issues with PySpark (my first PySpark code !) and Python library called Cerberus.

In the first part of the post I will present Cerberus, its validators, schema definition rules, and some other subtle details. In the second section, I will show how to extend the framework whereas in the last one, how to integrate Cerberus with Apache Spark.

Introduction to Cerberus

Cerberus is a kind of Spring Validation module (if you did some classical J2EE stuff) but for Python. The validation is based on a schema with validity rules. The framework brings also a possibility to extend already existent rules. It's quite useful when you want to validate your values against some external database or apply less universal validation rules.

In order to work, Cerberus needs a schema, a validator which may be customized and some data to validate. The data, unless I missed it in the documentation, must be a Python's dictionary. In the code, these requirements translate to:

from cerberus import Validator

    schema = {
        'id': {
            'type': 'integer', 'min': 1
        },
        'amount': {
            'type': 'float'
        },
        'first_order': {
            'type': 'datetime',
            'required': False
        },
        'user': {
            'type': 'dict',
            'schema': {
                'login': {'type': 'string'},
                'email': {'type': 'string', 'regex': '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'}
            }
        },
        'items': {
            'type': 'list',
            'empty': False
        }
    }
    valid_entry = {"id": 1, "amount": 30.97, "user": {"login": "user1", "email": "user1@user1.com"},
                   "items": ["item1", "item2", "item3"]}
    schema_validator = Validator(schema)
    is_valid = schema_validator.validate(valid_entry)
    assert is_valid, "No errors should be detected for valid object"

Let's see now how it behaves for an invalid entry:

    invalid_entry = {"id": -3, "amount": 30.97, "user": {"login": "user1", "email": "user1@user1.com"},
                               "items": ["item1"], "first_order": datetime.datetime(2019, 5, 1, 10, 0, 0)}
    errors_invalid_entry = schema_validator.validate(invalid_entry)
    print('1) {}'.format(schema_validator.errors['id']))
    assert 'id' in schema_validator.errors, "An id error should be detected for invalid id field"
    assert schema_validator.errors['id'][0] == 'min value is 1', "The error should be found for id min value"
    assert not errors_invalid_entry, "Validator should return an error"

With the initial definition, the validator will also fail for any extra field not defined in the validation schema. To ignore these errors, you must specify allow_unknown flag to True:

    valid_entry_extra_field = {"id": 3, "amount": 30.97, "user": {"login": "user1", "email": "user1@user1.com"},
                               "items": ["item1"], "first_order": datetime.datetime(2019, 5, 1, 10, 0, 0),
                               "order_status": "CONFIRMED"}
    errors_extra_field = schema_validator.validate(valid_entry_extra_field)
    assert 'unknown field' in schema_validator.errors['order_status'], "''unknown field' error should be returned after the validation"
    assert not errors_extra_field, "An extra field should not be accepted by the validator"

    # Now I reconfigure the validator to allow unknown fields
    schema_validator.allow_unknown = True
    errors_extra_field = schema_validator.validate(valid_entry_extra_field, schema)
    assert not schema_validator.errors, "No error should be detected when extra fields are allowed"
    assert errors_extra_field, "Validator allowing extra fields should not return errors"

Extending Cerberus

As I wrote in the previous section, you can also extend Cerberus with custom validators. To do so, you need simply to extend basic Validator class and define your validation rule following the _validate${yourValidationRuleFromSchema} pattern. Below you can find an example of the rule called productexists which checks whether a product really exists in the database (in-memory for the sake of simplicity):

    class ExtendedValidator(Validator):
        def _validate_productexists(self, lookup_table, field, value):
            if lookup_table == 'memory':
                existing_items = ['item1', 'item2', 'item3', 'item4']
                not_existing_items = list(filter(lambda item_name: item_name not in existing_items, value))
                if not_existing_items:
                    self._error(field, "{} items don't exist in the lookup table".format(not_existing_items))

    entry_not_existing_item = {"id": 3, "amount": 30.97, "user": {"login": "user1", "email": "user1@user1.com"},
                               "items": ["item1", "item300"], "first_order": datetime.datetime(2019, 5, 1, 10, 0, 0)}

    schema = {
        'id': {
            'type': 'integer', 'min': 1
        },
        'amount': {
            'type': 'float'
        },
        'first_order': {
            'type': 'datetime',
            'required': False
        },
        'user': {
            'type': 'dict',
            'schema': {
                'login': {'type': 'string'},
                'email': {'type': 'string', 'regex': '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'}
            }
        },
        'items': {
            'type': 'list',
            'empty': False,
            'productexists': 'memory'
        }
    }
    extended_validator = ExtendedValidator(schema)
    is_valid = extended_validator.validate(entry_not_existing_item)
    assert 'items' in extended_validator.errors, "Custom validator should detect invalid items"
    assert extended_validator.errors['items'][0] == "['item300'] items don't exist in the lookup table", "Unattended error was generated"
    assert not is_valid, "An entry with non existent item should be considered as invalid"

Another customization feature that is interesting - at least for the use case described in the next section - is error handler. As you saw, validation errors are exposed as errors property of Validator object. If you want to change their format, you can implement your own BaseErrorHandler and, for example, return only error codes:

# I extend SchemaErrorHandler to avoid the implementation of less meaningful methods
    class ErrorCodesHandler(SchemaErrorHandler):

        def __call__(self, validation_errors):
            output_errors = set()
            for error in validation_errors:
                output_errors.add(error.code)
            return output_errors

    schema = {
        'id': {
            'type': 'integer', 'min': 1
        },
        'amount': {
            'type': 'float'
        },
        'first_order': {
            'type': 'datetime',
            'required': False
        },
        'user': {
            'type': 'dict',
            'schema': {
                'login': {'type': 'string'},
                'email': {'type': 'string', 'regex': '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'}
            }
        },
        'items': {
            'type': 'list',
            'empty': False
        }
    }
    validator_with_custom_handler = Validator(schema, error_handler=ErrorCodesHandler())
    invalid_entry = {"id": -3, "amount": '30.97', "user": {"login": "user1", "email": "user1@user1.com"},
                               "items": ["item1"], "first_order": datetime.datetime(2019, 5, 1, 10, 0, 0)}
    errors_invalid_entry = validator_with_custom_handler.validate(invalid_entry)

    assert 66 in validator_with_custom_handler.errors, "An error with 66 code should be detected for invalid id field"
    assert 36 in validator_with_custom_handler.errors, "An error with 36 code should be detected for invalid amount type"
    assert not errors_invalid_entry, "Validator should return an error"

After this hands-on introduction to Cerberus features, let's see how we can mix it with PySpark to validate JSON files.

PySpark and Cerberus

For the PySpark example using Cerberus I want simply collect aggregated error codes. If my code seems scary, please give me some suggestions how to improve it since it's the first PySpark pipeline I've ever written !

from collections import defaultdict

from cerberus.errors import SchemaErrorHandler
from cerberus import Validator
from pyspark.sql import SparkSession

json_data = [
"""{"id": -1, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 2, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 3, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 4, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 5, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}""",

"""{"id": 6, "name": "", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": -7, "name": "", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": -8, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 9, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
{"id": 10, "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}"""]

for index, logs_to_write in enumerate(json_data):
    input_file_path = "./cerberus_{}.json".format(index)
    input_file = open(input_file_path, "w+")
    input_file.write(logs_to_write)
    input_file.close()


schema = {
    'id': {
        'type': 'integer', 'min': 1
    },
    'name': {
        'type': 'string',
        'empty': False
    },
    'user': {
        'type': 'dict',
        'schema': {
            'login': {'type': 'string'},
            'email': {'type': 'string', 'regex': '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'}
        }
    },
    'items': {
        'type': 'list',
        'empty': False
    }
}


class ErrorCodesHandler(SchemaErrorHandler):

    def __call__(self, validation_errors):
        output_errors = []
        for error in validation_errors:
            output_errors.append(error.code)
        return output_errors


def find_errors(rows):
    errors_map = defaultdict(int)
    extended_validator = Validator(error_handler=ErrorCodesHandler())
    for row in rows:
        row_dict = row.asDict(recursive=True)

        result = extended_validator.validate(row_dict, schema)
        if not result:
            error_codes = list(extended_validator.errors)
            for error_code in error_codes:
                errors_map[error_code] += 1
    return [(k, v) for k, v in errors_map.items()]


def sum_errors_number(errors_count_1, errors_count_2):
    return errors_count_1 + errors_count_2


spark = SparkSession.builder.master("local[*]").appName("Python Spark SQL data validation with Cerberus").getOrCreate()
errors_distribution = spark.read.json("./cerberus*", lineSep='\n').rdd.mapPartitions(find_errors)\
    .reduceByKey(sum_errors_number).collectAsMap()

print('errors_distribution={}'.format(errors_distribution))
assert errors_distribution[66] == 3, '3 values lower than 0 should be found'
assert errors_distribution[34] == 2, '2 empty fields should be found'

As you can see, Cerberus integrates pretty easily with Apache Spark. I didn't go very far with the code but I think there is a way to generate Apache Spark schema directly from Cerberus validation schema. Among some takeaways of my experience:

  • If you have nested fields, remember to do a recursive toDict conversion (row.asDict(recursive=True). Otherwise the nested entries will have an invalid format.
  • Beware of automatic schema resolution. Some fields may not be mapped to the fields you're expecting in the Cerberus schema. It can be the case of inconsistent floats which may be translated by Apache Spark to strings. For example, in the following snippet the amount field will be resolved by Apache Spark to a string:
    {"id": -1, "amount": "99.98", "name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
    {"id": 2,  "amount": 99.98,"name": "some name", "user": {"login": "user1", "email": "user1@user1.com"}, "items": ["item1"]}
    

When I first saw Cerberus, I was quite amazed by the simplicity of rules definition and of extendability. In this post, I tried to highlight these 2 properties, quite important for any validation framework. One of the drawbacks I experienced is the inability to directly test a JSON line, without the need to convert it into a dict. If you want to deep delve and discover more features, I added some interesting links after the post.

Read also about Validating JSON with Apache Spark and Cerberus here: Errors & Error Handling , Validation rules - validator , Cerberus .

If you liked it, you should read: Introduction to data quality

Share on:

Share, like or comment this post on Twitter: