Structured Streaming and Apache Kafka Schema Registry

The topic of this post brought Luan Carvalho who shared with me an Open Source project connecting Apache Spark to Apache Kafka Schema Registry. Initially, I wanted to exclusively focus on the project but on my way I discovered some other interesting points.

ABRiS in a nutshell

But let's start with the project you can use to connect Structured Streaming to Apache Kafka Schema Registry. It's called ABRiS and consists on interacting with Apache Avro records from custom from_avro and to_avro functions. The functions are not the same functions as the ones available in the native Apache Avro module for Apache Spark. ABRiS overrides them in order to pass one of 2 Schema Registry configurations:

def to_avro(column: Column, config: ToAvroConfig): Column
def from_avro(column: Column, config: FromAvroConfig): Column

Under-the-hood, it uses the same abstraction to convert the column data from/to Apache Avro. But unlike the native implementation, the ABRiS' one interacts with Schema Registry if the schema included in the record is unknown:

private[abris] case class AvroDataToCatalyst(
  child: Expression,
  abrisConfig: Map[String,Any],
  schemaRegistryConf: Option[Map[String,String]]
) extends UnaryExpression with ExpectsInputTypes {
// ...
  private def decode(payload: Array[Byte]): Any = if (confluentCompliant) {
    decodeConfluentAvro(payload)
  } else {
    decodeVanillaAvro(payload)
  }
  private def decodeConfluentAvro(payload: Array[Byte]): Any  = {

    val buffer = ByteBuffer.wrap(payload)
    if (buffer.get() != ConfluentConstants.MAGIC_BYTE) {
      throw new SerializationException("Unknown magic byte!")
    }

    val schemaId = buffer.getInt()

    val start = buffer.position() + buffer.arrayOffset()
    val length = buffer.limit() - 1 - ConfluentConstants.SCHEMA_ID_SIZE_BYTES
    decoder = DecoderFactory.get().binaryDecoder(buffer.array(), start, length, decoder)

    val reader = confluentReaderCache.getOrElseUpdate(schemaId, {
      val writerSchema = downloadWriterSchema(schemaId)
      new GenericDatumReader[Any](writerSchema, readerSchema)
    })

    result = reader.read(result, decoder)
    result
  }

How to use it? Let's see in the demo:

Dynamic schema is static

Did you notice something strange? ABRiS read the new version of the schema (2) but Apache Spark didn't return the new "vat" column. With the schema registry managing the schemas, we could think that Apache Spark will return the new versions in the new micro-batches. However, it didn't happen because of the micro-batch semantic. Let's check what happens in order when Apache Spark runs a new micro-batch.

MicroBatchExecution class has a field called logicalPlan. As the name indicates, it represents the logical plan of the execution and since the MicroBatchExecution is initialized only once, the logical plan is also initialized only once. The single things that will change inside are the data source offsets to read:

class MicroBatchExecution {

  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
// ...

    // Replace sources in the logical plan with data that has arrived since the last batch.
    val newBatchesPlan = logicalPlan transform {
      // For v1 sources.
      case StreamingExecutionRelation(source, output) =>
        newData.get(source).map { dataPlan =>
          val maxFields = SQLConf.get.maxToStringFields
          assert(output.size == dataPlan.output.size,
            s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
              s"${truncatedString(dataPlan.output, ",", maxFields)}")

          val aliases = output.zip(dataPlan.output).map { case (to, from) =>
            Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata))
          }
          Project(aliases, dataPlan)
        }.getOrElse {
          LocalRelation(output, isStreaming = true)
        }

      // For v2 sources.
      case r: StreamingDataSourceV2Relation =>
        newData.get(r.stream).map {
          case OffsetHolder(start, end) =>
            r.copy(startOffset = Some(start), endOffset = Some(end))
        }.getOrElse {
          LocalRelation(r.output, isStreaming = true)
        }
    }

But as you can see, nothing overrides the already resolved nodes like Project generated at the call of select(from_avro(...)). Let me show you this in the demo (executed against Apache Spark 2.4.6 which is compatible with ABRiS 4.2.0, but the same will apply on the 3.0+):

Alternatives

Before I terminate this blog post, let's check the alternatives for ABRiS. The first time when I heard about Schema Registry in the context of Structured Streaming was 2 years ago. I found by chance - you should know me already, I'm getting lost in the internet very easily - a blog post written by Souhaib Guitouni. He shared the idea of plugging the Schema Registry to Structured Streaming as a User-Defined Function.

If you use Schema Registry with Databricks, you can read the schema definitions as easily as with ABRiS. Simply call the to_avro/from_avro with the Schema Registry address in the parameter. Below you can find an example from Databricks documentation linked in the Furter reading section:

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

As you can see, there is no single way to use Schema Registry with Structured Streaming. If you are running your jobs on Databricks, you can do it with the native from_avro/to_avro methods. If not, you can use one of Open Source solutions like ABRiS.