This Post explains How To Read Kafka JSON Data in Spark Structured Streaming . Spark Kafka Data Source has below underlying schema:
|key|value|topic|partition|offset|timestamp|timestampType|
The actual data comes in json format and resides in the "value" . So Spark doesn't understand the serialization or format. For Spark, the value is just a bytes of information. So Spark needs to Parse the data first . There are 2 ways we can parse the JSON data. Let's say you read "topic1" from Kafka in Structured Streaming as below -
val kafkaData = sparkSession.sqlContext.readStream .format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic1)
.load()
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.from_json val dataSchema = StructType(
List(
StructField("column1", YOUR_COLUMN_TYPE, true)
StructField("column2", YOUR_COLUMN_TYPE, true)
)
) kafkaData.select(from_json($"value".cast(StringType), dataSchema))
display(kafkaData)
import org.apache.spark.sql.functions.get_json_object val columns: Seq[String] = List("column1","column2" )
val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) kafkaData.select(exprs: _*)
display (kafkaData)