Last November I spoke at Paris.py meetup about integrating Cerberus with PySpark to enhance JSON validation. During the talk, I covered some points that I would like to share with you in this blog post, mostly about error definition and normalized validation.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
The post is composed of 2 main parts. In the first one, I will focus on another way to pass error messages in the custom error handler. If you remember my previous post about Cerberus + PySpark (Validating JSON with Apache Spark and Cerberus), I used a method adding error messages. After a quick research, I found that there is another way. In the next part, I will cover an aspect that may help you to accelerate the data validation process, the normalization.
Custom error code
Previously I was dealing with custom validation rule errors by doing this:
def _validate_productexists(self, lookup_table, field, value): if lookup_table == 'memory': existing_items = ['item1', 'item2', 'item3', 'item4'] not_existing_items = list(filter(lambda item_name: item_name not in existing_items, value)) if not_existing_items: self._error(field, "{} items don't exist in the lookup table".format(not_existing_items))
It was fine because I just wanted to discover the error rather than expose it to the rest of the system. To expose it to the rest of the system, I should find a way to specify an error code specific to that custom error. Some error codes are already reserved and you can find that list on cerberus.errors package:
ErrorDefinition = namedtuple('ErrorDefinition', 'code, rule') """ This class is used to define possible errors. Each distinguishable error is defined by a *unique* error ``code`` as integer and the ``rule`` that can cause it as string. The instances' names do not contain a common prefix as they are supposed to be referenced within the module namespace, e.g. ``errors.CUSTOM``. """ # custom CUSTOM = ErrorDefinition(0x00, None) # existence DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141 DOCUMENT_MISSING = "document is missing" REQUIRED_FIELD = ErrorDefinition(0x02, 'required') UNKNOWN_FIELD = ErrorDefinition(0x03, None) DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies')
The errors are later set in the default Validator class by calling self._error method:
def _validate_allowed(self, allowed_values, field, value): """ {'type': 'container'} """ if isinstance(value, Iterable) and not isinstance(value, _str_type): unallowed = set(value) - set(allowed_values) if unallowed: self._error(field, errors.UNALLOWED_VALUES, list(unallowed)) else: if value not in allowed_values: self._error(field, errors.UNALLOWED_VALUE, value)
From that I figured out that I also needed an ErrorDefinition for my custom validation rule in order to use it in the aggregated output generated by PySpark's mapPartitions method. I ended up with the following code:
UNKNOWN_NETWORK = ErrorDefinition(333, 'network_exists') class ExtendedValidator(Validator): def _validate_network_exists(self, allowed_values, field, value): if value not in allowed_values: self._error(field, UNKNOWN_NETWORK, {})
Disabled normalization
I executed my demo pipeline against one 99MB file generated by local file system data generator. And the validation of that single file on my laptop (4 CPUs, 12GB of memory) took...6 minutes! At first, I thought that the error was somewhere in my Spark code but after disabling the validation, I saw that the processing was very fast. I started then to analyze what happened and measure the validation time. It was taking about 15-200 ms for every line, which in total meant a lot.
From that I knew that the .validate() method is the place to look closer at. I analyze the signature and was wondering why update is False by default and normalize is True:
def validate(self, document, schema=None, update=False, normalize=True)
For the update the answer was quite clear. If it's set to True, Cerberus will ignore the required constraint on the validation schema. In other words, it won't detect any missing fields. Regarding the normalize, I had to take a closer look at the documentation.
The normalize flag indicates what Cerberus will do with the validated document before applying the validation rules on it. If the flag is enabled, the framework will apply different normalization rules. For instance, it will convert given field into the expected type, it will drop unknown fields (if normalize=True && purge_unknown=True), set default values to the fields. And how it will do that? For every normalization group (type, purge uknown), it will iterate over all fields and filter out the ones that are eligible for the normalization. Below you can find an example for purge:
@staticmethod def __normalize_purge_readonly(mapping, schema): for field in [x for x in mapping if schema.get(x, {}).get('readonly', False)]: mapping.pop(field) return mapping @staticmethod def _normalize_purge_unknown(mapping, schema): """ {'type': 'boolean'} """ for field in [x for x in mapping if x not in schema]: mapping.pop(field) return mapping
If you have some normalization rules in your validation schema, you have no choice and you have to keep the normalization on. But on the other hand, if your validation schema is static, normalization won't be required. To disable it, you have to set the normalize flag to False during the validation:
validation_result = validator.validate(row.asDict(recursive=True), normalize=False)
Doing that for my simple dataset of 99MB (200 000 lines) helped to reduce the validation time from 6 to 3 minutes. Also, the validation of every line decreased from dozens or hundreds of ms to 1 - 30 ms!
The code I used in my November's presentation is on my Github (). If you have any other clues on how to accelerate Cerberus validation, feel free to comment on this post or create an issue in the repository.