DataFrames for analytics - Glue DynamicFrame

When I came to the data world, I had no idea what the data governance was. One of the tools which helped me to understand that was AWS Glue. I had a chance to work on it again during my AWS Big Data specialty exam preparation and that's at this moment I asked myself - "DynamicFrames?! What's the difference with DataFrames?" In this post I'll try to shed light on it.

Please notice that I won't discover the electricity here. DynamicFrame is closed-source and this time I'll have to proceed differently. To understand what's going on, I'll rather analyze the documentation and stack traces from StackOverflow and AWS forum questions. I will do that in the 2nd section of the blog post and consider it as guessing instead of "the" implementation. Before that, I will give you some more high-level context by analyzing AWS papers and talks.

DynamicFrame - a DataFrame with per-record schema

AWS Glue is a managed service, aka serverless Spark, itself managing data governance, so everything related to a data catalog. It's the default solution used on another AWS service called Lake Formation to handle data schema evolution on S3 data lakes. It's composed of 3 main parts, namely a crawler to identify new schemas or partitions in the dataset, scheduler to manage triggering, and ETL job to execute data transformations, for instance for data cleansing.

What is then the specificity of AWS Glue regarding Apache Spark SQL? AWS Glue is optimized for ETL jobs related to data cleansing and data governance, so everything about already quoted data catalog and semi-structured data formats like JSON (globally everywhere the schema cannot be enforced). To address these use cases, Glue uses a different principle than Apache Spark SQL. Since the schema cannot be enforced, Glue prefers to bind it to every record rather than requiring to analyze the data and build the most universal schema possible among available rows.

Unfortunately, AWS Glue is closed-source software and to satisfy my curiosity about the implementation details, I'll have to make some guessing.

Implementation guess

Despite the fact that AWS Glue is a serverless service, it runs on top of YARN, probably sharing something with EMR service. To check that, you can read the CloudWatch logs for your submitted jobs. You should see the spark-submit we use often for EMR:

/usr/lib/spark/bin/spark-submit ...  --master yarn --deploy-mode cluster ...

Also, the documentation states that the "the container running the executor is terminated ("killed") by Apache Hadoop YARN." (Visualize the Profiled Metrics on the AWS Glue Console).

A small digression. "serverless" doesn't mean here that you won't use servers. Glue runs on top of YARN, so on top of the servers managed by this resources manager. It also means that, even though it's "serverless", you may encounter memory problems, as explained in the debugging section of the documentation. End of the digression, you can also see that by analyzing stack traces that I've used to discover how Glue generates the final schema (well, I've just got a better idea of my supposition).

But before covering the schema, let's get back to what we can discover "for sure" by analyzing the documentation. Glue's synonymous for DataFrame is called DynamicFrame. It's globally a wrapper around RDD[DynamicRecord ] where DynamicRecord corresponds to Apache Spark's Row structure. Each DynamicRecord exposes few methods with *Field suffix which makes me think that they're responsible for the "flexible" schema management at row level. From that, you can check the type of the field which for Glue is a DynamicNode composed of 2 attributes: data type and value. DynamicNode is an abstract class and it has one implementation per type, like for instance StringNode to represent text, MapNode to store maps, or ArrayNode to store an array of different DynamicNodes. And this last type shows pretty well the data cleansing character of Glue. In Apache Spark, I would expect such data type to be typed and therefore store only 1 possible type. Glue, since it was designed to address cases of semi-structured data, can take any of the known data types.

At that point, my imaginary implementation would look like:

class DynamicNode {
  val fields: Map[String, DynamicNode] = ...;
}

That's guessed basics and I'm wondering what memory optimizations are used internally, and more globally, how far from the truth I am. Aside from the embedded schema, Glue can handle different types within the same schema with a ChoiceType field. You can later resolve it and, for instance, cast everything into a single type, drop columns with invalid types, transform confusing columns into a struct of all present data types or split the confusing column into multiple columns (eg. field into field_long and field_string). So despite that flexibility, you will need at some point the fixed schema to put it into your data catalog. By analyzing stack traces I found on different places, I got one shedding a little bit of light on the schema resolution:

# https://forums.aws.amazon.com/message.jspa?messageID=834578

# ...
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1127)
at org.apache.spark.sql.glue.util.SchemaUtils$.fromRDD(SchemaUtils.scala:57)
at com.amazonaws.services.glue.DynamicFrame.recomputeSchema(DynamicFrame.scala:231)
at com.amazonaws.services.glue.DynamicFrame.schema(DynamicFrame.scala:219)

As you can see, everything is driven by a SchemaUtils using Apache Spark tree aggregations. Just to recall, a tree aggregation uses partial aggregates that can be computed locally and emitted as partial results for further processing. That's my intuition behind Glue's schema resolution. First, every partition will generate a partial schema computed from the fields associated with every DynamicRecord. Next, it will combine these partial schemas into the final one including possible conflicted types.

That's what says my crystal ball. Even though that implementation guess makes sense, I'm pretty sure to miss something important that should have a real impact on the code performance and resources use optimization. But that's hard to say by only looking at the high-level documentation. If you have some guessings to add, please comment! I'd be very happy to get a little bit more context on this interesting data governance-oriented Apache Spark implementation.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!