DevOps | Cloud | Analytics | Open Source | Programming





How to Process, Handle or Produce Kafka Messages in PySpark ?



In this post, we will see How to Process, Handle or Produce Kafka Messages in PySpark. We will see the below scenarios in this regard -

  • Consuming Kafka Messages in PySpark
  • Sending Messages to Kafka from PySpark
  • Using PySpark as Consumer
  • Using PySpark as Producer
  • Using PySpark Both as a Consumer and a Producer
Section 1-3 cater for Spark Structured Streaming. Section 4 cater for Spark Streaming. We will visit the most crucial bit of the code - not the entire code of a Kafka PySpark application which essentially will differ based on use-case to use-case.  

1. PySpark as Consumer - Read and Print Kafka Messages:

Assumptions -

  • You already know how to import the modules , code the Spark Config part etc. So will skip that part.
 


from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

#######################
CODE FOR CONF SKIPPED
#######################

 


sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create streaming task
ssc = StreamingContext(sc, 0.10)
kafkaStream = KafkaUtils.createStream(ssc, "<HOSTNAMME:IP>", "spark-streaming-consumer", {'TOPIC1': 1})

Let's say we want to print the Kafka messages. The code below will set it up to print the complete set of data (specified by outputMode("complete")) to the console every time they are updated.


query = kafkaStream \\
        .writeStream \\
        .outputMode("complete") \\
        .format("console") \\
        .start()

query.awaitTermination()

**"OutputMode" has below possible options - **

  • append: Only the new rows in the streaming DataFrame/Dataset is written
  • complete: All the rows in the streaming DataFrame/Dataset is written
  • update: Only the rows that were updated in the streaming DataFrame/Dataset is written
   

2. PySpark as Both Consumer & Producer - Send Streaming Data to Kafka:

Assumptions -

  • Your are reading Streaming data from Kafka Topic
  • Then You are processing the data and creating some Output(in the form of a Dataframe) in PySpark
  • And then want to Stream that Back to Another Kafka Topic
We Skip the "Consuming Kafka Message" part as we have seen how to do it in Section 1.  


yourDF
.writeStream
.format("kafka")
.option("checkpointLocation", "/SOME\_DIR")
.option("kafka.bootstrap.servers", "HOST1:PORT1,HOST2:PORT2,HOST3:PORT3")
.option("topic", "TOPIC1")
.option("startingOffsets", "latest")
.start()

 

3. PySpark as Producer - Send Static Data to Kafka :

Assumptions -

  • Your are Reading some File(Local, HDFS, S3 etc.) or any form of Static Data
  • Then You are processing the data and creating some Output(in the form of a Dataframe) in PySpark
  • And then want to Write the Output to Another Kafka Topic
 


yourDF
 .write
 .format("kafka")
 .option("kafka.bootstrap.servers", "HOST1:PORT1,HOST2:PORT2,HOST3:PORT3")
 .option("topic", "TOPIC1")
 .save()

  All The above examples are for Spark Structured Streaming. Hence we have given the below as an example of Spark Streaming .

4. PySpark as Producer - Send Static Data to Kafka:

Assumptions

  • This Example is of Spark Streaming (Not Structured Streaming)
 


\# Imports
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

 


\# Code Block to Send\\Produce Messages to Kafka

def sendMessage(msg):
    msgs = msg.collect()
    for rec in msgs:
        producer.send('spark.out', str(rec))
        producer.flush()

 


\# Main Flow 

producer = KafkaProducer(bootstrap\_servers='<HOSTNAME>:9092')

sc = SparkContext(appName="PySparkKafkaApp")
ssc = StreamingContext(sc, 20)

data = KafkaUtils.createDirectStream(ssc, \["TOPIC1"\], {"metadata.broker.list": <BROKER ADDRESS>})
data.foreachRDD(sendMessage)

ssc.start()
ssc.awaitTermination()

  Hope this post helps.  

Other Reads -

 


kafka pyspark streaming example ,kafka pyspark ,kafka pyspark integration ,kafka pyspark streaming ,kafka pyspark github ,kafka pyspark read ,kafka pyspark jar ,pyspark kafka consumer ,pyspark kafka producer ,apache kafka with pyspark ,cassandra spark kafka ,confluent kafka pyspark ,failed to find data source kafka pyspark ,from pyspark.streaming.kafka import kafkautils ,from pyspark.streaming.kafka import kafkautils error ,how to connect to kafka from pyspark ,how to integrate kafka with pyspark ,how to read data from kafka topic using pyspark ,how to read data from kafka using pyspark ,importerror no module named 'pyspark.streaming.kafka' ,kafka and pyspark ,kafka consumer in pyspark ,kafka data pyspark ,kafka in pyspark ,kafka integration with pyspark ,kafka producer pyspark example ,kafka pyspark ,kafka pyspark example ,kafka pyspark github ,kafka pyspark integration ,kafka pyspark jar ,kafka pyspark not working ,kafka pyspark query ,kafka pyspark query example ,kafka pyspark read ,kafka pyspark streaming ,kafka pyspark streaming example ,kafka pyspark tutorial ,kafka pyspark xgboost ,kafka pyspark xml ,kafka pyspark xml example ,kafka pyspark yaml ,kafka pyspark yield ,kafka pyspark youtube ,kafka pyspark zero ,kafka pyspark zip ,kafka pyspark zip file ,kafka pyspark zoom ,kafka spark streaming pyspark ,kafka streaming using pyspark ,kafka streaming with pyspark ,kafka to pyspark ,kafka using pyspark ,kafka vs pyspark ,kafka with pyspark ,kafka with spark streaming pyspark ,modulenotfounderror no module named 'pyspark.streaming.kafka' ,pip install pyspark.streaming.kafka ,pyspark and kafka ,pyspark confluent kafka ,pyspark connect to kafka ,pyspark dataframe to kafka ,pyspark dataframe to kafka topic ,pyspark from kafka ,pyspark import kafka ,pyspark java.lang.noclassdeffounderror org/apache/kafka/common/serialization/bytearraydeserializer ,pyspark jupyter notebook kafka ,pyspark kafka avro ,pyspark kafka avro deserializer ,pyspark kafka batch ,pyspark kafka batch processing ,pyspark kafka client ,pyspark kafka commit ,pyspark kafka connector ,pyspark kafka consumer ,pyspark kafka consumer example ,pyspark kafka consumer group ,pyspark kafka createstream ,pyspark kafka dataframe ,pyspark kafka dependency ,pyspark kafka deserializer ,pyspark kafka documentation ,pyspark kafka download ,pyspark kafka dstream ,pyspark kafka from beginning ,pyspark kafka from\_json ,pyspark kafka github ,pyspark kafka group id ,pyspark kafka hdfs ,pyspark kafka integration ,pyspark kafka jar ,pyspark kafka json ,pyspark kafka json to dataframe ,pyspark kafka jupyter ,pyspark kafka kerberos ,pyspark kafka library ,pyspark kafka offset ,pyspark kafka offset management ,pyspark kafka package ,pyspark kafka parse json ,pyspark kafka partition ,pyspark kafka producer ,pyspark kafka producer example ,pyspark kafka read json ,pyspark kafka sample ,pyspark kafka schema ,pyspark kafka schema registry ,pyspark kafka sink ,pyspark kafka ssl ,pyspark kafka streaming example ,pyspark kafka structured streaming example ,pyspark kafka to hive ,pyspark kafka utils ,pyspark kafka window ,pyspark kafka.common.offsetoutofrangeexception ,pyspark no module named kafka ,pyspark print kafka ,pyspark read avro from kafka ,pyspark read data from kafka ,pyspark read from kafka ,pyspark read json from kafka ,pyspark read kafka topic ,pyspark readstream kafka example ,pyspark send data to kafka ,pyspark send dataframe to kafka ,pyspark send message to kafka ,pyspark streaming and kafka ,pyspark streaming from kafka ,pyspark streaming kafka ,pyspark streaming kafka consumer ,pyspark streaming kafka offset ,pyspark streaming with kafka ,pyspark streaming write to kafka ,pyspark structured streaming kafka json ,pyspark to kafka ,pyspark vs kafka ,pyspark write json to kafka ,pyspark write kafka ,pyspark write to kafka ,pyspark write to kafka example ,pyspark write to kafka topic ,pyspark writestream kafka ,pyspark.sql.utils.streamingqueryexception 'failed to construct kafka consumer ,pyspark.streaming.kafka install ,pyspark.streaming.kafka not found ,pyspark.streaming.kafka python ,pyspark\_submit\_args kafka ,python kafka pyspark ,python pyspark kafka consumer ,read data from kafka topic using pyspark ,read from kafka topic pyspark ,read kafka topic in pyspark ,read kafka topic using pyspark ,spark flume kafka ,spark kafka createdirectstream ,spark kafka options ,spark kafka rdd ,spark kafka startingoffsets ,spark streaming kafka pyspark ,spark-sql-kafka pyspark ,structured streaming kafka pyspark example ,why spark streaming with kafka ,write dataframe to kafka pyspark ,write kafka pyspark ,write to kafka from pyspark ,write to kafka topic pyspark ,zeppelin pyspark kafka ,kafka message pyspark api ,kafka message pyspark backup ,kafka message pyspark c# ,kafka message pyspark command ,kafka message pyspark dataframe ,kafka message pyspark download ,kafka message pyspark error ,kafka message pyspark example ,kafka message pyspark filter ,kafka message pyspark format ,kafka message pyspark github ,kafka message pyspark installation ,kafka message pyspark java ,kafka message pyspark java example ,kafka message pyspark json ,kafka message pyspark juniper ,kafka message pyspark junit ,kafka message pyspark key ,kafka message pyspark kotlin ,kafka message pyspark kubernetes ,


kafka message pyspark linux ,kafka message pyspark list ,kafka message pyspark load ,kafka message pyspark location ,kafka message pyspark mac ,kafka message pyspark meaning ,kafka message pyspark mysql ,kafka message pyspark not found ,kafka message pyspark not working ,kafka message pyspark oracle ,kafka message pyspark plugin ,kafka message pyspark query ,kafka message pyspark queue ,kafka message pyspark range ,kafka message pyspark run ,kafka message pyspark sql ,kafka message pyspark update ,kafka message pyspark usage ,kafka message pyspark validation ,kafka message pyspark version ,kafka message pyspark windows ,kafka message pyspark xgboost ,kafka message pyspark xml ,kafka message pyspark yaml ,kafka message pyspark yield ,kafka message pyspark youtube ,kafka message pyspark zero ,kafka message pyspark zip ,pyspark send message to kafka ,why kafka is used with spark ,kafka pyspark ,kafka pyspark streaming example ,kafka pyspark integration ,kafka pyspark streaming ,kafka pyspark github ,kafka pyspark read ,kafka pyspark jar ,cassandra spark kafka ,pyspark kafka consumer ,kafka and pyspark ,pyspark kafka avro ,pyspark kafka avro deserializer ,pyspark kafka batch processing ,pyspark kafka batch ,pyspark kafka from beginning ,pyspark kafka consumer example ,pyspark kafka consumer group ,pyspark kafka connector ,spark kafka createdirectstream ,pyspark kafka client ,pyspark kafka createstream ,pyspark kafka deserializer ,pyspark kafka dependency ,pyspark kafka dstream ,pyspark kafka dataframe ,pyspark kafka documentation ,pyspark kafka download ,kafka data pyspark ,kafka pyspark example ,kafka producer pyspark example ,structured streaming kafka pyspark example ,pyspark kafka from\_json ,pyspark from kafka ,write to kafka from pyspark ,spark flume kafka ,pyspark kafka group id ,pyspark kafka to hive ,pyspark kafka hdfs ,kafka in pyspark ,kafka consumer in pyspark ,pyspark import kafka ,pyspark kafka json ,pyspark kafka json to dataframe ,pyspark kafka jupyter ,pyspark kafka read json ,pyspark kafka parse json ,pyspark kafka kerberos ,pyspark kafka library ,pyspark kafka offset management ,pyspark kafka offset ,spark kafka options ,pyspark kafka.common.offsetoutofrangeexception ,pyspark kafka producer ,pyspark kafka package ,pyspark kafka partition ,pyspark print kafka ,python kafka pyspark ,pyspark streaming with kafka ,pyspark kafka schema registry ,pyspark readstream kafka example ,spark kafka rdd ,pyspark kafka structured streaming example ,pyspark kafka ssl ,pyspark kafka sink ,spark kafka startingoffsets ,pyspark kafka schema ,kafka pyspark tutorial ,kafka to pyspark ,read from kafka topic pyspark ,write to kafka topic pyspark ,pyspark kafka utils ,kafka using pyspark ,kafka streaming using pyspark ,kafka vs pyspark ,kafka with pyspark ,pyspark kafka window ,kafka streaming with pyspark ,kafka integration with pyspark ,pyspark writestream kafka ,pyspark write kafka ,kafka with spark streaming pyspark ,


kafka spark streaming example ,kafka spark streaming example java ,kafka spark streaming example scala ,kafka spark streaming example github ,kafka spark streaming scala example github ,nifi kafka spark streaming example ,kafka producer spark streaming example ,kafka spark streaming python example ,kafka spark streaming spring boot example ,kafka spark streaming cassandra example ,kafka spark streaming code example ,kafka spark streaming word count example ,pyspark streaming kafka consumer example ,kafka spark streaming hbase example ,kafka spark streaming example in scala ,kafka spark streaming example in java ,kafka spark streaming integration example ,kafka spark streaming java example github ,kafka spark streaming cassandra java example ,kafka spark streaming twitter java example ,spark kafka direct stream example java ,kafka spark streaming example python ,spark kafka direct stream example python ,kafka consumer spark streaming example scala ,kafka spark structured streaming scala example ,kafka spark streaming twitter scala example ,spark kafka direct stream example scala ,spark streaming kafka example pyspark ,kafka-spark-streaming-integration-example-tutorial ,kafka with spark streaming tutorial examples ,pyspark streaming with kafka example ,spark kafka integration cloudera ,kafka spark integration example ,spark kafka integration example java ,spark kafka integration guide ,spark kafka integration github ,kafka spark hbase integration ,kafka spark integration java example ,spark kafka integration java ,spark kafka integration maven ,kafka spark integration python ,kafka spark integration using python ,kafka spark streaming integration ,spark kafka integration structured streaming ,spark kafka integration scala ,kafka spark streaming pyspark ,kafka twitter streaming pyspark ,pyspark.streaming.kafka not found ,pyspark.streaming.kafka import kafkautils ,pyspark.streaming.kafka install ,pyspark.streaming.kafka.kafkautils ,pyspark streaming and kafka ,kafka spark streaming best practices ,spark kafka streaming batch size ,kafka spark streaming code ,kafka spark streaming connector ,kafka spark streaming csv ,kafka spark streaming checkpoint ,kafka spark streaming consumer group ,kafka spark streaming docker ,kafka spark streaming databricks ,kafka spark streaming dataframe ,kafka spark streaming documentation ,kafka spark direct stream ,kafka spark direct stream example ,kafka and spark streaming difference ,kafka spark streaming maven dependency ,kafka spark streaming elasticsearch ,kafka spark streaming exactly once ,pyspark streaming from kafka ,from pyspark.streaming.kafka import kafkautils ,from pyspark.streaming.kafka import kafkautils error ,pyspark streaming read from kafka topic ,kafka spark streaming github ,kafka spark streaming python github ,pyspark streaming kafka github ,kafka spark streaming hbase ,kafka spark streaming hdfs ,kafka spark streaming hive ,kafka spark streaming hortonworks ,import spark.streaming.kafka ,kafka spark streaming java example ,kafka spark streaming java tutorial ,kafka spark streaming jar ,spark kafka streaming json ,kafka spark structured streaming json ,kafka spark streaming kerberos ,kafka spark streaming kudu ,kafka spark streaming machine learning ,kafka spark streaming offset management ,kafka spark streaming python ,kafka spark streaming project ,spark kafka streaming properties ,twitter kafka spark streaming python ,pyspark.streaming.kafka python ,kafka spark streaming interview questions ,kafka spark streaming resume ,spark kafka read stream ,pyspark structured streaming kafka json ,pyspark streaming write to kafka ,kafka spark streaming use cases ,kafka vs spark streaming vs storm ,kafka spark streaming window ,spark streaming with kafka pyspark ,spark streaming with kafka python ,kafka spark github ,spark streaming kafka example github ,spark kafka cassandra github ,spark kafka example github ,spark streaming kafka github java ,spark structured streaming kafka github ,spark-sql-kafka github ,spark scala kafka github ,pyspark read avro from kafka ,read data from kafka using spark streaming ,read data from kafka topic using pyspark ,pyspark read data from kafka ,pyspark read from kafka ,pyspark read json from kafka ,pyspark read kafka stream ,pyspark read kafka topic ,kafka spark jar ,apache spark cassandra kafka ,spark-kafka-cassandra-applying-lambda-architecture ,cassandra spark and kafka ,kafka spark cassandra architecture ,spark streaming kafka and cassandra ,aalkilani/spark-kafka-cassandra-applying-lambda-architecture ,kafka spark cassandra ,kafka spark cassandra docker ,spark streaming kafka cassandra example ,kafka spark hadoop cassandra ,cassandra with kafka ,kafka spark cassandra python ,spark cassandra to kafka ,spark kafka cassandra ,pyspark streaming kafka consumer ,python pyspark kafka consumer ,spark kafka consumer python example ,spark kafka batch consumer ,spark kafka consumer configuration ,spark.kafka.consumer.cache.capacity ,spark kafka consumer config ,spark.streaming.kafka.consumer.cache.enabled ,spark.streaming.kafka.consumer.cache ,spark.streaming.kafka.consumer.failondataloss ,spark kafka consumer group name ,spark kafka consumer java example ,spark kafka consumer java ,spark kafka kafkaconsumer is not safe for multithreaded access ,spark kafka consumer offset ,spark kafka consumer properties ,spark kafka consumer python ,spark streaming kafka consumer python ,spark.streaming.kafka.consumer.poll ,spark streaming kafka consumer properties ,spark kafka consumer record ,pyspark streaming kafka example ,spark kafka client version ,kafka consumer pyspark ,kafka connect pyspark ,kafka producer pyspark ,kafka utils pyspark ,spark kafka avro deserializer ,spark kafka avro ,spark kafka avro schema registry ,spark kafka avro serializer ,spark kafka avro consumer ,spark kafka avro example ,spark kafka avro producer ,spark kafka avro without schema registry ,spark kafka from\_avro ,spark streaming avro files ,spark streaming avro java example ,spark kafka avro read ,spark streaming avro schema registry ,spark streaming kafka avro scala ,spark streaming with avro


kafka python ,kafka python producer ,kafka python tutorial ,kafka python consumer ,kafka python github ,kafka pypi ,kafka pyspark streaming example ,kafka python consumer example ,kafka python create topic ,kafka pyspark ,kafka python producer ,kafka python tutorial ,kafka python consumer ,kafka python github ,kafka python api ,kafka python consumer example ,kafka python create topic ,kafka python consumer not reading messages ,kafka python example github ,kafka python spark streaming