In this Part 1 of the post , I will write some SparkSQL Sample Code Examples in PySpark . These are the Ready-To-Refer code References used quite often for writing any SparkSql application. Hope you find them useful. Below are some basic points about SparkSQL -
from pyspark.sql import SparkSession
spark \= SparkSession \\ .builder \\ .appName("LearnSparkSql") \\ .getOrCreate()
sc \= spark.sparkContext
sqlc \= SQLContext(sc)
import sqlContext.implicits.\_
df = sqlc.read.csv('YOU_INPUT_FILE.csv')
For loading other formats of Files - Json, Parquet etc , Read my other Post.
df.show()
\# Look at the Data
df.show()
# Schema of the data
df.schema()
# Print schema of the data
df.printSchema()
# Show first 10 lines of the data
df.head(10)
# No of rows of data
df.count()
# Columns of the data
df.columns()
\# Create Table from the DataFrame as a SQL temporary view
df.createOrReplace**TempView**("datatable")
df2 \= spark.sql("SELECT \* FROM datatable")
df2.show()
\# Global temporary view
df.createGlobalTempView("gtvdatatable")
\# Global temporary view is tied to a system preserved database \`global\_temp\`
df3 = spark.sql("SELECT \* FROM global\_temp.gtvdatatable").show()
df3.show()
Please note GlobalTempView tables are cross sessioned - means such tables can be shared among all sessions.
from pyspark.sql import Row
\# Load a text file and convert each line to a Row.
\# Assume the text file contains product Id & product name and they are comma separated
lines \= sc.textFile("YOUR\_INPUT\_FILE.txt")
parts \= lines.map(lambda l: l.split(","))
people \= parts.map(lambda p: Row(product\_id\=p\[0\], product\_name\=int(p\[1\])))
\# Infer the schema, and register the DataFrame as a table.
schemaProduct \= spark.createDataFrame(productData)
schemaProduct.createOrReplaceTempView("productTable")
\# Now we can run SQL over the dataFrames since it is registered as a table.
top100Products \= spark.sql("SELECT product\_name FROM productTable WHERE product\_id <= 100")
top100Products.show()
\# Import data types
from pyspark.sql.types import \*
\# Load a text file and convert each line to a Row.
\# Assume the text file contains product Id & product name and they are comma separated
lines \= sc.textFile("YOUR\_INPUT\_FILE.txt")
parts \= lines.map(lambda l: l.split(","))
\# Each line is converted to a tuple.
product \= parts.map(lambda p: (p\[0\], p\[1\].strip()))
\# The schema is encoded in a string.
schemaString \= "product\_name product\_id"
fields \= \[StructField(field\_name, StringType(), True) for field\_name in schemaString.split()\]
schema \= StructType(fields)
\# Apply the schema to the RDD. schemaProduct = spark.createDataFrame(product, schema)
\# Creates a temporary view using the DataFrame schemaProduct.createOrReplaceTempView("product")
\# SQL can be run over DataFrames that have been registered as a table.
df4 \= spark.sql("SELECT name FROM product")
df4.show()
We will cover PySpark SQL examples in Part 2 of the post. Other Posts You might find interesting to Read -