DevOps | Cloud | Analytics | Open Source | Programming

How To Code SparkSQL in PySpark - Examples Part 1

In this Part 1 of the post , I will write some SparkSQL Sample Code Examples in PySpark  . These are the Ready-To-Refer code References used quite often for writing any SparkSql application. Hope you find them useful. Below are some basic points about SparkSQL -

  • Spark SQL is a query engine built on top of Spark Core.
  • It gives you the Flavour of a Traditional SQL-Like Style although everything runs on Spark.
  • Spark SQL uses a query optimizer called Catalyst to execute any query.
  • Queries can be expressed using SQL or HiveQL and used against various data formats e.g. JSON, CSV, Text, Databases etc.
  • Spark SQL provides us the Complete Expressive capability of declarative programing with Spark dataframes(Dataframes sit on top of RDDs - the Fundamental data structures of Spark Core).
Now that we know some basic features of SparkSql , lets see various SparkSql code samples. Please note we are assuming Spark version => 2.x+  

Create SparkSQL Context:

from pyspark.sql import SparkSession 

spark \= SparkSession \\ .builder \\ .appName("LearnSparkSql") \\ .getOrCreate() 

sc \= spark.sparkContext 

sqlc \= SQLContext(sc) 

import sqlContext.implicits.\_


Read Data:

df ='YOU_INPUT_FILE.csv')

For loading other formats of Files - Json, Parquet etc , Read my other Post.  

Explore Data:
\# Look at the Data

# Schema of the data

# Print schema of the data

# Show first 10 lines of the data

# No of rows of data

# Columns of the data

Create Temporary Tables:

Lets create temporary table views using the dataframe so that we can run Sql queries on the table. Please note these TempView are session-scoped table i.e. the tables ceases to exists once the Spark session terminates.

\# Create Table from the DataFrame as a SQL temporary view

df2 \= spark.sql("SELECT \* FROM datatable")


Create Global View Tables:

If you want to create as Table view that continues to exists (unlike Temp View tables ) as long as the Spark Application is running , create a Global TempView table

\# Global temporary view

\# Global temporary view is tied to a system preserved database \`global\_temp\`
df3 = spark.sql("SELECT \* FROM global\_temp.gtvdatatable").show()

  Please note GlobalTempView tables are cross sessioned - means such tables can be shared among all sessions.  

Convert RDD to Dataframe if you Already Know the Schema:

Since RDD is a Row of Objects , we will convert each Row Object to a Dataframe entity .  

from pyspark.sql import Row 

\# Load a text file and convert each line to a Row. 

\# Assume the text file contains product Id & product name and they are comma separated 

lines \= sc.textFile("YOUR\_INPUT\_FILE.txt")

parts \= l: l.split(",")) 

people \= p: Row(product\_id\=p\[0\], product\_name\=int(p\[1\]))) 

\# Infer the schema, and register the DataFrame as a table. 

schemaProduct \= spark.createDataFrame(productData) 


\# Now we can run SQL over the dataFrames since it is registered as a table. 

top100Products \= spark.sql("SELECT product\_name FROM productTable WHERE product\_id <= 100")


Convert RDD to Dataframe with User-Defined Schema:

\# Import data types 

from pyspark.sql.types import \* 
\# Load a text file and convert each line to a Row. 
\# Assume the text file contains product Id & product name and they are comma separated
lines \= sc.textFile("YOUR\_INPUT\_FILE.txt") 
parts \= l: l.split(",")) 
\# Each line is converted to a tuple. 
product \= p: (p\[0\], p\[1\].strip())) 
\# The schema is encoded in a string. 

schemaString \= "product\_name product\_id" 

fields \= \[StructField(field\_name, StringType(), True) for field\_name in schemaString.split()\] 

schema \= StructType(fields) 

\# Apply the schema to the RDD. schemaProduct = spark.createDataFrame(product, schema) 

\# Creates a temporary view using the DataFrame schemaProduct.createOrReplaceTempView("product") 

\# SQL can be run over DataFrames that have been registered as a table. 

df4 \= spark.sql("SELECT name FROM product")

    We will cover PySpark SQL examples in Part 2 of the post.   Other Posts You might find interesting to Read -