Usually you can define a schema on your input data from .schema(...) function of Apache Spark SQL:
val ordersFromJson = testedSparkSession.read.schema(orderSchema) .json(ordersToAddFile)
However, if you apply this logic to Apache Kafka source, you will encounter an exception like that:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Kafka source has a fixed schema and cannot be set with a custom one at scala.Predef$.require(Predef.scala:277) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:67)
This fixed schema is composed of:
def kafkaSchema: StructType = StructType(Seq( StructField("key", BinaryType), StructField("value", BinaryType), StructField("topic", StringType), StructField("partition", IntegerType), StructField("offset", LongType), StructField("timestamp", TimestampType), StructField("timestampType", IntegerType) ))
That's why, if you need to map the value it into some specific data structure, you can use of on extraction functions like functions.from_json($"value", Visit.Schema) that is able to convert the JSON string into the schema you need:
val query = dataFrame.selectExpr("CAST(value AS STRING)") .select(functions.from_json($"value", Visit.Schema).as("data"))