DevOps | Cloud | Analytics | Open Source | Programming





How To Code a PySpark Cassandra Application ?



This post is a Sample Code How To Code a PySpark Cassandra Application ?. It reads the data from a csv file and writes to Spark Table.  

1. PySpark Code:

 


import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import \*
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

\# This is only needed for windows
def setEnv():

  \# Replace with your Spark dir in windows 
  os.environ\['SPARK\_HOME'\] \= 'D:\\\\Work\\\\spark-2.3.0-bin-hadoop2.7'
  os.environ\['HADOOP\_HOME'\] \= 'D:\\\\Work\\\\spark-2.3.0-bin-hadoop2.7'

  print (os.environ\['SPARK\_HOME'\])

def main():

  spark \= SparkSession.builder.master("local\[\*\]").appName("simple\_pyspark\_app").getOrCreate()
  sc \= spark.sparkContext
  sqlContext \= SQLContext(spark.sparkContext)

  \# If running on windows , set env variables , for Linux skip
  if os.name \== 'nt':
    setEnv() 


  \# Load customers.csv - Sample data below
  \# 12345, Chandler, Bing, [email protected], california
  CUSTOMERS\_DATA \= '/sampledata/customers.csv'

  \# define the schema, corresponding to a line in the csv data file for Customer
  customers\_schema \= StructType(\[
      StructField('customer\_id', IntegerType(), nullable\=True),
      StructField('customer\_fname', StringType(), nullable\=True),
      StructField('customer\_lname', StringType(), nullable\=True),
      StructField('customer\_email', StringType(), nullable\=True),
      StructField('customer\_city', StringType(), nullable\=True)\])

  \# Load data
  customers\_df \= spark.read.csv(path\=CUSTOMERS\_DATA, schema\=customers\_schema).cache()

  customers\_df.createOrReplaceTempView("customers")

  spark.sql("select count(5) from customers").show()

  sc.stop()

if \_\_name\_\_ \== '\_\_main\_\_':
    main()


 

2. How to Run PySpark code:

  • Go to the Spark bin dir
  • run below command toexecute the pyspark application

    **./spark-submit <Scriptname\_with\_path.py>** 
    
  Additional Read :

   

pyspark-cassandra connector, pyspark-cassandra connector example, pyspark-cassandra-connector jar, pyspark cassandra jupyter, pyspark cassandra write, querying cassandra data using spark sql in python, spark write to cassandra example, pyspark write dataframe to cassandra,pyspark cassandra write, pyspark-cassandra tutorial, pyspark cassandra jupyter, pyspark-cassandra-connector jar, pyspark-cassandra connector example, anguenot/pyspark-cassandra, pyspark cassandra connection string example, spark write to cassandra example, pyspark-cassandra tutorial,pyspark-cassandra connector example, pyspark-cassandra-connector jar, pyspark cassandra jupyter, spark write to cassandra example, pyspark cassandra write, spark-cassandra connector scala example, spark cassandra connector properties