DevOps | Cloud | Analytics | Open Source | Programming





How To Code SparkSQL in PySpark - Examples Part 1



In this Part 1 of the post , I will write some SparkSQL Sample Code Examples in PySpark  . These are the Ready-To-Refer code References used quite often for writing any SparkSql application. Hope you find them useful. Below are some basic points about SparkSQL -

  • Spark SQL is a query engine built on top of Spark Core.
  • It gives you the Flavour of a Traditional SQL-Like Style although everything runs on Spark.
  • Spark SQL uses a query optimizer called Catalyst to execute any query.
  • Queries can be expressed using SQL or HiveQL and used against various data formats e.g. JSON, CSV, Text, Databases etc.
  • Spark SQL provides us the Complete Expressive capability of declarative programing with Spark dataframes(Dataframes sit on top of RDDs - the Fundamental data structures of Spark Core).
Now that we know some basic features of SparkSql , lets see various SparkSql code samples. Please note we are assuming Spark version => 2.x+  

Create SparkSQL Context:


from pyspark.sql import SparkSession 

spark \= SparkSession \\ .builder \\ .appName("LearnSparkSql") \\ .getOrCreate() 

sc \= spark.sparkContext 

sqlc \= SQLContext(sc) 

import sqlContext.implicits.\_


   

Read Data:


df = sqlc.read.csv('YOU_INPUT_FILE.csv')

For loading other formats of Files - Json, Parquet etc , Read my other Post.  

Explore Data:


df.show()
\# Look at the Data
df.show()

# Schema of the data
df.schema()

# Print schema of the data
df.printSchema()

# Show first 10 lines of the data
df.head(10)

# No of rows of data
df.count()

# Columns of the data
df.columns()

Create Temporary Tables:

Lets create temporary table views using the dataframe so that we can run Sql queries on the table. Please note these TempView are session-scoped table i.e. the tables ceases to exists once the Spark session terminates.


\# Create Table from the DataFrame as a SQL temporary view
df.createOrReplace**TempView**("datatable")

df2 \= spark.sql("SELECT \* FROM datatable")
df2.show()


 

Create Global View Tables:

If you want to create as Table view that continues to exists (unlike Temp View tables ) as long as the Spark Application is running , create a Global TempView table


\# Global temporary view
df.createGlobalTempView("gtvdatatable")

\# Global temporary view is tied to a system preserved database \`global\_temp\`
df3 = spark.sql("SELECT \* FROM global\_temp.gtvdatatable").show()
df3.show()


  Please note GlobalTempView tables are cross sessioned - means such tables can be shared among all sessions.  

Convert RDD to Dataframe if you Already Know the Schema:

Since RDD is a Row of Objects , we will convert each Row Object to a Dataframe entity .  


from pyspark.sql import Row 

\# Load a text file and convert each line to a Row. 

\# Assume the text file contains product Id & product name and they are comma separated 

lines \= sc.textFile("YOUR\_INPUT\_FILE.txt")

parts \= lines.map(lambda l: l.split(",")) 

people \= parts.map(lambda p: Row(product\_id\=p\[0\], product\_name\=int(p\[1\]))) 

\# Infer the schema, and register the DataFrame as a table. 

schemaProduct \= spark.createDataFrame(productData) 

schemaProduct.createOrReplaceTempView("productTable") 

\# Now we can run SQL over the dataFrames since it is registered as a table. 

top100Products \= spark.sql("SELECT product\_name FROM productTable WHERE product\_id <= 100") 

top100Products.show()


   

Convert RDD to Dataframe with User-Defined Schema:


\# Import data types 

from pyspark.sql.types import \* 
\# Load a text file and convert each line to a Row. 
\# Assume the text file contains product Id & product name and they are comma separated
lines \= sc.textFile("YOUR\_INPUT\_FILE.txt") 
parts \= lines.map(lambda l: l.split(",")) 
\# Each line is converted to a tuple. 
product \= parts.map(lambda p: (p\[0\], p\[1\].strip())) 
\# The schema is encoded in a string. 

schemaString \= "product\_name product\_id" 

fields \= \[StructField(field\_name, StringType(), True) for field\_name in schemaString.split()\] 

schema \= StructType(fields) 

\# Apply the schema to the RDD. schemaProduct = spark.createDataFrame(product, schema) 

\# Creates a temporary view using the DataFrame schemaProduct.createOrReplaceTempView("product") 

\# SQL can be run over DataFrames that have been registered as a table. 

df4 \= spark.sql("SELECT name FROM product") 

df4.show()


    We will cover PySpark SQL examples in Part 2 of the post.   Other Posts You might find interesting to Read -