Dealing with nested data in Apache Spark SQL

Versions: Apache Spark 2.3.1 https://github.com/bartosz25/spark-...scala/com/waitingforcode/sql/nested

Nested data structure is very useful in data denormalization for Big Data needs. It avoids joins that we could use for several related and fully normalized datasets. But processing such data structures is not always simple. Fortunately Apache Spark SQL provides different utility functions helping to work with them.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in 👉 Early Release on the O'Reilly platform

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

The post is divided in 3 parts. The first of them talks about the simplest nested data structure - fully structured (same fields everywhere). The second one contains more complex examples - fully structured and repeated data. The last part talks about more complicated case - unstructured (different fields) and repeated data. Each of parts has some learning tests with a comment about generated execution plans.

Fully structured nested data

Working with fully structured nested data is straightforward thanks to dot notation. It means we can access nested levels by referencing them with dots. The 2 examples below are showing that:

before {
  val oneLevelEntries =
    """
      |{"key": "event_1", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}}
      |{"key": "event_2", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}}
      |{"key": "event_3", "user_context": {"browser": "Google Chrome", "lang": "en-US", "version": "65.0.3325.181"}}
      |{"key": "event_4", "user_context": {"browser": "Google Chrome", "lang": "en-US", "version": "65.0.3325.181"}}
      |{"key": "event_5", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}}
      |{"key": "event_6", "user_context": {"browser": "Opera", "lang": "en-US", "version": "41.0.2353.56"}}
      |{"key": "event_7", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}}
    """.stripMargin
  FileUtils.writeStringToFile(new File(OneLevelFile), oneLevelEntries)
  val twoLevelsEntries =
    """
      |{"key": "event_1", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}, "geo": {"country": {"iso_code": "FR", "name_en": "France"}}}
      |{"key": "event_2", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}, "geo": {"country": {"iso_code": "FR", "name_en": "France"}}}
      |{"key": "event_3", "user_context": {"browser": "Google Chrome", "lang": "en-US", "version": "65.0.3325.181"}, "geo": {"country": {"iso_code": "UK", "name_en": "the United Kingdom "}}}
      |{"key": "event_4", "user_context": {"browser": "Google Chrome", "lang": "en-US", "version": "65.0.3325.181"}, "geo": {"country": {"iso_code": "CN", "name_en": "China"}}}
      |{"key": "event_5", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}, "geo": {"country": {"iso_code": "FR", "name_en": "France"}}}
      |{"key": "event_6", "user_context": {"browser": "Opera", "lang": "en-US", "version": "41.0.2353.56"}, "geo": {"country": {"iso_code": "ES", "name_en": "Spain"}}}}
      |{"key": "event_7", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}, "geo": {"country": {"iso_code": "IT", "name_en": "Italy"}}}
    """.stripMargin
  FileUtils.writeStringToFile(new File(TwoLevelsFile), twoLevelsEntries)
}

"DataFrame" should "read 1-level nested data" in {
  val oneLevelsDataFrame = sparkSession.read.json(OneLevelFile)

  val eventWithUserBrowser = oneLevelsDataFrame.select($"key", $"user_context.browser".as("user_browser"), $"user_context.lang")

  val collectedUsers = eventWithUserBrowser.collect()
    .map(row => (row.getAs[String]("key"), row.getAs[String]("user_browser"), row.getAs[String]("lang")))

  collectedUsers should have size 7
  collectedUsers should contain allOf(("event_1", "Firefox", "en-US"), ("event_2", "Firefox", "en-US"),
    ("event_3", "Google Chrome", "en-US"), ("event_4", "Google Chrome", "en-US"), ("event_5", "Firefox", "en-US"),
    ("event_6", "Opera", "en-US"), ("event_7", "Firefox", "en-US"))
}

"DataFrame" should "read 2-levels nested data" in {
  val twoLevelsDataFrame = sparkSession.read.json(TwoLevelsFile)

  // even for 2-level nested fields the access is direct since the used type is the structure of another structure
  val geoUserStats = twoLevelsDataFrame.select($"key", $"user_context.lang", $"geo.country.iso_code")

  val collectedUserStats = geoUserStats.collect()
    .map(row => (row.getAs[String]("key"), row.getAs[String]("lang"), row.getAs[String]("iso_code")))

  collectedUserStats should have size 7
  collectedUserStats should contain allOf(("event_1", "en-US", "FR"), ("event_2", "en-US", "FR"),
    ("event_3", "en-US", "UK"), ("event_4", "en-US", "CN"), ("event_5", "en-US", "FR"), ("event_6", "en-US", "ES"),
    ("event_7", "en-US", "IT"))
}

The execution plan for the first projection looks like:

== Parsed Logical Plan ==
'Project [unresolvedalias('key, None), 'user_context.browser AS user_browser#10, unresolvedalias('user_context.lang, None)]
+- AnalysisBarrier
      +- Relation[key#6,user_context#7] json

== Analyzed Logical Plan ==
key: string, user_browser: string, lang: string
Project [key#6, user_context#7.browser AS user_browser#10, user_context#7.lang AS lang#18]
+- Relation[key#6,user_context#7] json

== Optimized Logical Plan ==
Project [key#6, user_context#7.browser AS user_browser#10, user_context#7.lang AS lang#18]
+- Relation[key#6,user_context#7] json

== Physical Plan ==
*(1) Project [key#6, user_context#7.browser AS user_browser#10, user_context#7.lang AS lang#18]
+- *(1) FileScan json [key#6,user_context#7] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/tmp/spark/1_level.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct>

As you can see, there is nothing magic. The nested data is simply read as a structure making the dot access possible. More interesting point concerns physical data organization. If you remember from the post about Spark Project Tungsten, rows in Dataset are organized in arrays of bytes. Does nested data structures change something ? After some tests made with the help of debugger and breakpoints I found that nested data structure doesn't change anything. For a sample entry {"key": "xx", "user_context": {"browser": "Firefox", "lang": "en-US", "version": "59.0.3"}}, the bytes stored in UnsafeRow are represented as:

[0,1800000002,2000000038,7878,0,2000000007,2800000005,3000000006,786f6665726946,53552d6e65,332e302e3935]

The nested user_context field starts after the second 0 bit marker and 3 consecutive numbers represent the position for 3 nested fields. Accessing them is possible thanks to getStruct(ordinal: Int, numFields: Int) method. We can see its use in the generated code:

/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (scan_mutableStateArray[0].hasNext()) {
/* 029 */       InternalRow scan_row = (InternalRow) scan_mutableStateArray[0].next();
/* 030 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 031 */       boolean scan_isNull = scan_row.isNullAt(0);
/* 032 */       UTF8String scan_value = scan_isNull ? null : (scan_row.getUTF8String(0));
/* 033 */       boolean scan_isNull1 = scan_row.isNullAt(1);
/* 034 */       InternalRow scan_value1 = scan_isNull1 ? null : (scan_row.getStruct(1, 3));
/* 035 */       boolean project_isNull1 = scan_isNull1;
/* 036 */       UTF8String project_value1 = null;
/* 037 */
/* 038 */       if (!scan_isNull1) {
/* 039 */         if (scan_value1.isNullAt(1)) {
/* 040 */           project_isNull1 = true;
/* 041 */         } else {
/* 042 */           project_value1 = scan_value1.getUTF8String(1);
/* 043 */         }
/* 044 */
/* 045 */       }

But what happens for deeper nested structures, as for instance the ones from the 2nd example ? Does it also considered as a part of the main row ? Let's take slightly modified row from the 2nd test:{"key": "event_1", "geo": {"country": {"iso_code": "FR", "name_en": "France"}}}. After some analysis with the help of breakpoints we can see that deep nested data structures follow exactly the same rule as flatten ones - every level is separated with a 0 bit mark, as shown in the following array representing tested row:

# 1800000038,5000000007 ==> "key" position + value
# 0,1000000028 ==> "geo"
# 0,1000000028,0,1800000002 ==> "geo.country"
[0,1800000038,5000000007,0,1000000028,0,1800000002,2000000006,5246,65636e617246,315f746e657665]

And unsurprisingly, the access to this deep structure is made with 2 calls of getStruct method in the generated code:

/* 034 */       InternalRow scan_value = scan_isNull ? null : (scan_row.getStruct(0, 1));
/* 035 */       boolean project_isNull2 = scan_isNull;
/* 036 */       InternalRow project_value2 = null;
/* 037 */
/* 038 */       if (!scan_isNull) {
/* 039 */         if (scan_value.isNullAt(0)) {
/* 040 */           project_isNull2 = true;
/* 041 */         } else {
/* 042 */           project_value2 = scan_value.getStruct(0, 2);
/* 043 */         }
/* 044 */
/* 045 */       }
/* 046 */       boolean project_isNull1 = project_isNull2;
/* 047 */       UTF8String project_value1 = null;
/* 048 */
/* 049 */       if (!project_isNull2) {
/* 050 */         if (project_value2.isNullAt(0)) {
/* 051 */           project_isNull1 = true;
/* 052 */         } else {
/* 053 */           project_value1 = project_value2.getUTF8String(0);
/* 054 */         }
/* 055 */
/* 056 */       }

Fully structured repeated nested data

Let's see now what happens with repeated fields containing nested data structures, as arrays:

private val RepeatedNestedFile = "/tmp/spark/repeated_nested.json"

before {
  val oneLevelEntries =
    """
      |{"user": "user_1", "orders": [{"id": "order#1", "amount": 39.5}, {"id": "order#2", "amount": 41.5}]}
      |{"user": "user_2", "orders": [{"id": "order#3", "amount": 112.5}]}
      |{"user": "user_3", "orders": []}
      |{"user": "user_4", "orders": [{"id": "order#4", "amount": 15.5}, {"id": "order#5", "amount": 21.5}, {"id": "order#6", "amount": 5.25}]}
      |{"user": "user_5", "orders": [{"id": "order#7", "amount": 31.33}, {"id": "order#8", "amount": 32}, {"id": "order#9", "amount": 10}, {"id": "order#10", "amount": 11}]}
      |{"user": "user_6", "orders": [{"id": "order#11", "amount": 15.11}]}
      |{"user": "user_7", "orders": [{"id": "order#12", "amount": 99}, {"id": "order#13", "amount": 45}]}
    """.stripMargin
  FileUtils.writeStringToFile(new File(RepeatedNestedFile), oneLevelEntries)
}

"DataFrame" should "repeated nested data with explode function" in {
  val oneLevelsDataFrame = sparkSession.read.json(RepeatedNestedFile)

  val ordersByUser = oneLevelsDataFrame.select($"user", explode($"orders").as("order"))

  val ordersByUserToCheck = ordersByUser.collect()
    .map(row => {
      val order = row.getAs[Row]("order")
      (row.getAs[String]("user"), order.getAs[String]("id"), order.getAs[Double]("amount"))
    })
  ordersByUserToCheck should have size 13
  ordersByUserToCheck should contain allOf(("user_1", "order#1", 39.5D), ("user_1", "order#2", 41.5D),
    ("user_2", "order#3", 112.5D), ("user_4", "order#4", 15.5D), ("user_4", "order#5", 21.5D), ("user_4", "order#6", 5.25D),
    ("user_5", "order#7", 31.33D), ("user_5", "order#8", 32.0D), ("user_5", "order#9", 10.0D), ("user_5", "order#10", 11.0D),
    ("user_6", "order#11", 15.11D), ("user_7", "order#12", 99.0D), ("user_7", "order#13", 45.0D))
}

"DataFrame" should "repeated nested data with explode_outer function" in {
  val oneLevelsDataFrame = sparkSession.read.json(RepeatedNestedFile)

  val ordersByUser = oneLevelsDataFrame.select($"user", explode_outer($"orders").as("order"))

  // explode_outer is very similar to explode
  // The difference is that it also takes empty arrays and maps and transforms them to null entries
  val ordersByUserToCheck = ordersByUser.collect()
    .map(row => {
      val order = row.getAs[Row]("order")
      if (order == null) {
        (row.getAs[String]("user"), null, null)
      } else {
        (row.getAs[String]("user"), order.getAs[String]("id"), order.getAs[Double]("amount"))
      }
    })
  ordersByUserToCheck should have size 14
  ordersByUserToCheck should contain allOf(("user_1", "order#1", 39.5D), ("user_1", "order#2", 41.5D),
    ("user_2", "order#3", 112.5D), ("user_4", "order#4", 15.5D), ("user_4", "order#5", 21.5D), ("user_4", "order#6", 5.25D),
    ("user_5", "order#7", 31.33D), ("user_5", "order#8", 32.0D), ("user_5", "order#9", 10.0D), ("user_5", "order#10", 11.0D),
    ("user_6", "order#11", 15.11D), ("user_7", "order#12", 99.0D), ("user_7", "order#13", 45.0D), ("user_3", null, null))
}

"DataFrame" should "repeated nested data with posexplode function" in {
  val oneLevelsDataFrame = sparkSession.read.json(RepeatedNestedFile)

  // posexplode behaves similar to explode except that it returns an extra colum called `position` indicating
  // the index of extracted data
  // It also has a variant for outer, exactly as explode, that helps to returns even rows without extracted element
  val ordersByUserWithOrderIndex = oneLevelsDataFrame.select($"user", posexplode($"orders"))

  val schema = ordersByUserWithOrderIndex.schema
  schema.simpleString shouldEqual "struct>"
  val ordersByUserWithOrderNrToCheck = ordersByUserWithOrderIndex.collect()
    .map(row => {
      val order = row.getAs[Row]("col")
      (row.getAs[String]("user"), row.getAs[Int]("pos"), order.getAs[String]("id"), order.getAs[Double]("amount"))
    })
  println(s"ordersByUserWithOrderNrToCheck=${ordersByUserWithOrderNrToCheck.mkString(",")}")
  ordersByUserWithOrderNrToCheck should have size 13
  ordersByUserWithOrderNrToCheck should contain allOf(("user_1", 0, "order#1", 39.5D), ("user_1", 1, "order#2", 41.5D),
    ("user_2", 0, "order#3", 112.5D), ("user_4", 0, "order#4", 15.5D), ("user_4", 1, "order#5", 21.5D), ("user_4", 2, "order#6", 5.25D),
    ("user_5", 0, "order#7", 31.33D), ("user_5", 1, "order#8", 32.0D), ("user_5", 2, "order#9", 10.0D), ("user_5", 3, "order#10", 11.0D),
    ("user_6", 0, "order#11", 15.11D), ("user_7", 0, "order#12", 99.0D), ("user_7", 1, "order#13", 45.0D))
}

"DataFrame" should "extract Map fields with explode function" in {
  // Explode also applies on Map type:
  val users = Seq(
    (1, Map("user" -> "user_1", "city" -> "London")),
    (2, Map("user" -> "user_2", "city" -> "Paris")),
    (3, Map("user" -> "user_3", "city" -> "Warsaw"))
  ).toDF("id", "user_info")

  val flattenedUsers = users.select($"id", explode($"user_info"))

  // explode applied on a Map will flatten the structure by extracting every key-value pair and put it beside
  // the root document. And extracted values will be called "key" for Map's key and "value" for its value
  flattenedUsers.schema.simpleString shouldEqual "struct"
  val flattenedUsersMapped = flattenedUsers.collect().map(row => (row.getAs[Int]("id"), row.getAs[String]("key"), row.getAs[String]("value")))

  flattenedUsersMapped should have size 6
  flattenedUsersMapped should contain allOf((1, "user", "user_1"), (1, "city", "London"), (2, "user", "user_2"),
    (2, "city", "Paris"), (3, "user", "user_3"), (3, "city", "Warsaw"))
}

As you can see to deal with repeated data we can use the function called explode(column: Column). It creates a new row for every entry defined in the unflattened field array. In terms of execution plan it generates something similar to:

== Parsed Logical Plan ==
'Project [unresolvedalias('user, None), explode('orders) AS order#10]
+- AnalysisBarrier
      +- Relation[orders#6,user#7] json

== Analyzed Logical Plan ==
user: string, order: struct
Project [user#7, order#15]
+- Generate explode(orders#6), false, [order#15]
   +- Relation[orders#6,user#7] json

== Optimized Logical Plan ==
Generate explode(orders#6), [0], false, [order#15]
+- Relation[orders#6,user#7] json

== Physical Plan ==
Generate explode(orders#6), [user#7], false, [order#15]
+- *(1) FileScan json [orders#6,user#7] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/tmp/spark/repeated_nested.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct>,user:string>

The execution plan doesn't say a lot about explode but the generated code does:

/* 082 */         for (int index = 0; index < numElements; index++) {
/* 083 */           if (tmpInput.isNullAt(index)) {
/* 084 */             mutableStateArray3[0].setNull(index);
/* 085 */           } else {
/* 086 */
/* 087 */             final int tmpCursor1 = mutableStateArray1[0].cursor;
/* 088 */
/* 089 */             final InternalRow tmpInput1 = tmpInput.getStruct(index, 2);
/* 090 */             if (tmpInput1 instanceof UnsafeRow) {
/* 091 */
/* 092 */               final int sizeInBytes = ((UnsafeRow) tmpInput1).getSizeInBytes();
/* 093 */               // grow the global buffer before writing data.
/* 094 */               mutableStateArray1[0].grow(sizeInBytes);
/* 095 */               ((UnsafeRow) tmpInput1).writeToMemory(mutableStateArray1[0].buffer, mutableStateArray1[0].cursor);
/* 096 */               mutableStateArray1[0].cursor += sizeInBytes;
/* 097 */
/* 098 */             } else {
/* 099 */               mutableStateArray2[1].reset();
/* 100 */
/* 101 */
/* 102 */               if (tmpInput1.isNullAt(0)) {
/* 103 */                 mutableStateArray2[1].setNullAt(0);
/* 104 */               } else {
/* 105 */                 mutableStateArray2[1].write(0, tmpInput1.getDouble(0));
/* 106 */               }
/* 107 */
/* 108 */
/* 109 */               if (tmpInput1.isNullAt(1)) {
/* 110 */                 mutableStateArray2[1].setNullAt(1);
/* 111 */               } else {
/* 112 */                 mutableStateArray2[1].write(1, tmpInput1.getUTF8String(1));
/* 113 */               }
/* 114 */             }
/* 115 */
/* 116 */             mutableStateArray3[0].setOffsetAndSize(index, tmpCursor1, mutableStateArray1[0].cursor - tmpCursor1);
/* 117 */
/* 118 */           }
/* 119 */         }

The explode operation is not complicated since it consists on extracting a structure from the list and persisting it into the memory as a new row. Internally an array is represented as a variable-length field where each of entries is separated by 0 bit marker. For instance, {"user": "user_1", "orders": [{"id": "order#1", "amount": 39.5}, {"id": "order#2", "amount": 41.5}]} is represented as an UnsafeRow:

[0,1800000060,7800000006,2,0,2000000020,4000000020,0,4043c00000000000,1800000007,3123726564726f,0,4044c00000000000,1800000007,3223726564726f,315f72657375]

The array's length is set as 2 and the 0 following it marks the beginning of a nested object. Later, 2 other 0s are used to separated order entries inside the array (4043c00000000000,1800000007,3123726564726f and 4044c00000000000,1800000007,3223726564726f).

Unstructured repeated nested data

Finally, let's see what happens with unstructured data:

private val OneLevelFile = "/tmp/spark/1_level_unpredictable.json"

before {
  val oneLevelEntries =
    """
      |{"key": "event_1", "user_context": {"browser": "Firefox", "lang": "en-US"}}
      |{"key": "event_2", "user_context": {"browser": "Firefox"}}
      |{"key": "event_3", "user_context": {"browser": "Google Chrome"}}
      |{"key": "event_4", "geo": {"city": "London", "loc": {"lan": 0, "lat": 1}}}
      |{"key": "event_5", "user_context": {"browser": "Firefox", "lang": "en-US"}}
      |{"key": "event_6", "user_context": {"browser": "Opera", "lang": "en-US"}}
      |{"key": "event_7", "user_context": {"browser": "Firefox", "lang": "en-US"}}
    """.stripMargin
  FileUtils.writeStringToFile(new File(OneLevelFile), oneLevelEntries)
}

"DataFrame" should "read 1-level nested data" in {
  val unstructuredDataSchema = StructType(Seq(
    StructField("key", StringType), StructField("user_context", StructType(
      Seq(StructField("browser", StringType), StructField("lang", StringType))
    )), StructField("geo", StructType(
      Seq(StructField("city", StringType), StructField("loc", StructType(
        Seq(StructField("lat", IntegerType), StructField("lon", IntegerType))
      )))
    ))
  ))
  // Since data schema is not consistent, it's better to explicit the schema rather than letting Apache Spark to
  // guess it from sampled lines. It's especially true if the sampling doesn't take all data (and it doesn't for
  // performance reasons !) and thus, it can miss some fields
  val oneLevelsDataFrame = sparkSession.read.schema(unstructuredDataSchema).json(OneLevelFile)

  val eventWithUserBrowserAndCity = oneLevelsDataFrame.select($"key",
    $"user_context.browser".as("user_browser"), $"geo.city", $"geo.loc.lat")

  val collectedUsers = eventWithUserBrowserAndCity.collect()
    .map(row => (row.getAs[String]("key"), row.getAs[String]("user_browser"), row.getAs[String]("city"), row.getAs[Int]("lat")))

  // As you can see, and it's pretty expected, the returned rows have holes for missing values in the data source
  // You can also see that dealing with unstructured data, even in Spark's structured abstraction which is SQL module,
  // is possible. The requirement is however to define the schema explicitly in order to avoid bad surprises at
  // runtime (e.g. schema discovered only in half).
  // The same rule applies on nested fields with more than 1 depth. The example of that was given with `geo.loc` column
  // However if writing the schema manually is tedious and you need to keep source schema,
  // you can always consider to use low level API with RDD abstraction. If you'd use a manual schema with 30 fields
  // belonging to 1 kind of data and 30 other fields to another kind of data, you'll end up with a DataFrame having
  // 60 fields, where 50% of them will be empty. Often it's not acceptable to do so, par example in the application
  // responsible for validating message format and dispatching it into another sink (e.g. Kafka topic with valid messages)
  collectedUsers should have size 7
  collectedUsers should contain allOf(("event_1", "Firefox", null, null), ("event_2", "Firefox", null, null),
    ("event_3", "Google Chrome", null, null), ("event_4", null, "London", 1), ("event_5", "Firefox", null, null),
    ("event_6", "Opera", null, null), ("event_7", "Firefox", null, null))
}

As noted in the comments, since we're working on unstructured data with a framework adapted to structured data, we need to create a schema taking into account all possible fields. Useless to say that managing such schema is quite challenging and leaving the engine deduce it is not possible without reading all possible values (otherwise some fields may miss). But does this schema have an impact on the rows stored internally by Apache Spark ? To see what happens, let's execute a small test:

val testFileEntries =
    """|{"key": "event_4",  "user_context": {"browser": "Firefox"}, "geo": {"city": "London", "loc": {"lan": 0, "lat": 1}}}
      |{"key": "event_2", "user_context": {"browser": "Firefox"}}""".stripMargin
FileUtils.writeStringToFile(new File(TestFile), testFileEntries)

val unstructuredDataSchema = StructType(Seq(
  StructField("key", StringType), StructField("user_context", StructType(
    Seq(StructField("browser", StringType), StructField("lang", StringType))
  )), StructField("geo", StructType(
    Seq(StructField("city", StringType), StructField("loc", StructType(
      Seq(StructField("lat", IntegerType), StructField("lan", IntegerType))
    )))
  ))
))
  
val oneLevelsDataFrame = sparkSession.read.schema(unstructuredDataSchema).json(TestFile).repartition(1)

// breakpoint added at the next line
oneLevelsDataFrame.collect()

After adding some breakpoints in UnsafeRow class we can easily figure out how Apache Sparks stores these inconsistent rows:

# event_4
[0,2000000007,2800000020,4800000038,345f746e657665,2,1800000007,0,786f6665726946,0,1800000006,2000000018,6e6f646e6f4c,0,1,0]

# event_2
[4,2000000007,2800000020,0,325f746e657665,2,1800000007,0,786f6665726946] 

Obviously, the rows are stored as an array of bytes and the second one is much shorter. Even if these output is not very meaningful, we're able to deduce that "0" inside each array are used to separate the main object from its nested parts. The "4" present in event_2 output marks the beginning of new row. Internally the complete row is an array of 128 bytes while the incomplete's one has only 72 bytes.

Nested data structures are powerful concept letting us to group data with similar characteristics in one place. As shown throughout this post, Apache Spark provides a lot of methods to work on such structures. The simplest one is dot notation that enables access to nested properties with a simple separation of levels by dots. Another method, used mainly for repeated values, consists on the use of explode function and its variants. It flattens the row by joining every repeated value with its parent. As shown in the last example of nested but semi-structured data, Apache Spark is also able to optimize the storage and omit unnecessary data persisted in underlying byte array.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!