How to create a schema for Apache Kafka source in Structured Streaming?

Usually you can define a schema on your input data from .schema(...) function of Apache Spark SQL:

val ordersFromJson = testedSparkSession.read.schema(orderSchema)
  .json(ordersToAddFile)

However, if you apply this logic to Apache Kafka source, you will encounter an exception like that:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Kafka source has a fixed schema and cannot be set with a custom one
    at scala.Predef$.require(Predef.scala:277)
      at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:67) 

This fixed schema is composed of:

  def kafkaSchema: StructType = StructType(Seq(
    StructField("key", BinaryType),
    StructField("value", BinaryType),
    StructField("topic", StringType),
    StructField("partition", IntegerType),
    StructField("offset", LongType),
    StructField("timestamp", TimestampType),
    StructField("timestampType", IntegerType)
  ))

That's why, if you need to map the value it into some specific data structure, you can use of on extraction functions like functions.from_json($"value", Visit.Schema) that is able to convert the JSON string into the schema you need:

val query = dataFrame.selectExpr("CAST(value AS STRING)")
                                     .select(functions.from_json($"value", Visit.Schema).as("data"))