Two topics, two schemas, one subscription in Apache Spark Structured Streaming

Versions: Apache Spark 2.4.3

After my January's talk about Apache Kafka integration in Structured Streaming I got an interesting question on off. The question was, how to process 2 topics simultaneously with Structured Streaming? The "small" problem was the fact that both had different schemas.

Of course, it's much easier to process such topics in 2 different applications. However, if the events from them are somehow related (data migration merging 2 topics into a single one?) and must be processed together, there is no choice but to use a single application. In this blog post, I will go through 3 different strategies we could use to manage that problem with JSON messages format

The problem I will try to solve is about the dataset looking like that:

  def produceDifferentSchemas(topic1: String, topic2: String, inputMemoryStream: MemoryStream[KafkaRecord]): Unit = {
    new Thread(new Runnable {
      override def run(): Unit = {
        while (true) {
          inputMemoryStream.addData(Seq(
            KafkaRecord(topic1, System.currentTimeMillis().toString, """{"lower": "a", "upper": "A"}"""),
            KafkaRecord(topic2, System.currentTimeMillis().toString, """{"number": 1, "next_number": 2}"""),
            KafkaRecord(topic1, System.currentTimeMillis().toString, """{"lower": "b", "upper": "B"}"""),
            KafkaRecord(topic2, System.currentTimeMillis().toString, """{"number": 2, "next_number": 3}"""),
            KafkaRecord(topic1, System.currentTimeMillis().toString, """{"lower": "c", "upper": "C"}"""),
            KafkaRecord(topic2, System.currentTimeMillis().toString, """{"number": 3, "next_number": 4}""")
          ))
          Thread.sleep(2000L)
        }
      }
    }).start()
  }

  def produceSameSchemaDifferentTopics(topic1: String, topic2: String, inputMemoryStream: MemoryStream[KafkaRecord]): Unit = {
    new Thread(new Runnable {
      override def run(): Unit = {
        while (true) {
          inputMemoryStream.addData(Seq(
            KafkaRecord(topic1, "new_order", """{"amount": 30.99, "user_id": 1}"""),
            KafkaRecord(topic2, "new_user", """{"user_id": 3"""),
            KafkaRecord(topic1, "new_order", """{"amount": 20.99, "user_id": 2}"""),
            KafkaRecord(topic1, "new_user", """{"user_id": 3"""),
            KafkaRecord(topic2, "new_order", """{"amount": 90.99, "user_id": 3}""")
          ))
          Thread.sleep(2000L)
        }
      }
    }).start()
  }

I will use the latter method only in the next section of this post. Two remaining ones will use the former function of the snippet.

Filter on key

Even though I like to think about the problems, I don't like to reinvent the wheel. That's why I started my analysis by looking for similar problems and solutions. You will find all the links in the "Read also" section. One of the first solutions I found suggests to filter data on the key and apply a corresponding processing logic accordingly, something like that (for sake of simplicity, I'm using MemoryStream imitating Apache Kafka source schema):

  import org.apache.spark.sql.types._
  val schema = new StructType()
    .add($"amount".double.copy(nullable = false))
    .add($"user_id".int.copy(nullable = false))

  inputMemoryStream.toDS()
    .filter("key = 'new_order'")
    .select($"topic",
      functions.from_json($"value".cast("string"), schema, Map[String, String]()).as("order")
    )
    .filter("order.amount > 25")
    .writeStream.format("console").option("truncate", "false")
    .start().awaitTermination()

This solution comes from the first link I found and it addresses the question of processing multiple topics sharing the same schema. I found the fact of using the key for filtering purposes quite interesting and that's the reason, even though the solution doesn't solve our initial problem, I added it here. However, be conscious of the limits of this approach. The key won't always represent the event type, especially if you do care about ordering and want to put all records of the same entity on the same partition (using a key is the easiest way to do so). Thus, take all this into account but keep in mind that filtering or message analysis can also be done with other things than the body, like the header information and maybe the key of the message.

Filter on topic

The second solution I found answers the question about the content of Dataset if it's processed with foreachBatch and multiple topics subscription is used. Thanks to foreachBatch we can easily separate the processing and, therefore, use 2 different schemas for our inconsistent datasets and process them independently inside one common application. An adapted solution could look like that:

  import org.apache.spark.sql.types._
  val schemaLetters = new StructType()
    .add($"lower".string.copy(nullable = false))
    .add($"upper".string.copy(nullable = false))
  val schemaNumbers = new StructType()
    .add($"number".int.copy(nullable = false))
    .add($"next_number".int.copy(nullable = false))

  inputMemoryStream.toDS()
    .select($"topic", $"value".cast("string"))
    .writeStream
    .foreachBatch((dataset, _) => {
      dataset.persist()
      dataset.filter($"topic" === topic1)
        .select(functions.from_json($"value".cast("string"), schemaLetters).as("value")).select("value.*")
        .selectExpr("CONCAT('letters=', lower, '/',  upper)")
        .show(false)
      dataset.filter($"topic" === topic2)
        .select(functions.from_json($"value".cast("string"), schemaNumbers).as("value")).select("value.*")
        .selectExpr("CONCAT('numbers=', number, '/',  next_number)")
        .show(false)
      dataset.unpersist()
    })
   .start().awaitTermination()

As you can see, with quite simple processing logic, we can work on 2 datasets in the single micro-batch.

Large shared schema

For the next strategy, we could imagine a schema taking all values from both topics, and depending on the source, exposing different attributes. Thanks to this big schema and implementation of data processing logic inside the data type, we can keep one data processing logic. The solution could also work with SQL's CASE/WHEN operation but the case class version is more readable and that's why I will show only this one in the following snippet:

  case class SharedEntryData(lower: Option[String], upper: Option[String],
                             number: Option[Int], next_number: Option[Int]) {
    def topic1Label = s"${lower.orNull} / ${upper.orNull}"
    def topic2Label = s"${number.orNull} / ${next_number.orNull}"
  }
  case class SharedEntry(topic: String, data: SharedEntryData) {
    def toCommonFormat: CommonFormat = {
      if (topic == topic1) {
        CommonFormat(s"letters=${data.topic1Label}")
      } else {
        CommonFormat(s"numbers=${data.topic2Label}")
      }
    }
  }
  case class CommonFormat(label: String)
  import org.apache.spark.sql.types._

  inputMemoryStream.toDS()
    .withColumn("shared_entry", functions.from_json($"value".cast("string"),
      ScalaReflection.schemaFor[SharedEntryData].dataType.asInstanceOf[StructType], Map[String, String]()))
    .select($"topic",  $"shared_entry".as("data")).as[SharedEntry]
    .map(sharedSchema => {
      sharedSchema.toCommonFormat
    })
    .writeStream.format("console").option("truncate", "false")
    .start().awaitTermination()

As you can see, SharedEntryData exposes 2 methods that the SharedEntry class uses in the mapping function, depending on the topic name. Of course, this approach also has some drawbacks like a potential overhead of creating instances for such large schemas.

Another and less tricky solution consists of using an application that will convert both schemas into a common one and in that case your application logic will be much simpler. On the other hand, you can risk some of properties like ordering within a partition of every topic.For more complex scenarios, like the ones where you need to process the data before writing it, one of the 3 presented strategies could be useful. None of them is perfect though and depending on your use case, you may end up with another, better adopted approach. If it's the case, I'm really curious about the solutions and feel free to share it in the comments part.