This is a step by step sample code to Read write a Spark Cassandra Application in Scala. It shows how to Connect , Read into a Cassandra Table from a Spark Application. It uses Spark RDD Structure.
// Author - gankrin.org
// Write into Cassandra Database
// Read from Cassandra Database
// Spark Scala program
package com.gankrin
import java.io.FileReader
import java.util.Properties
import scala.collection.JavaConversions.\_
// Basic Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.\_
// Spark SQL Cassandra imports
import com.datastax.spark.connector.\_
import org.apache.spark.sql.cassandra.\_
import com.datastax.spark.connector.\_
import com.datastax.spark.connector.rdd.\_
import org.apache.spark.sql.cassandra.\_
// Cassandra Java driver imports
import com.datastax.driver.core.Cluster
// Date import for processing logic
import java.util.Date
object AppSparkCassandra {
def main(args: Array\[String\]) {
// Configuration
val sparkConf \= new SparkConf()
.setAppName("WordCountApp")
.setMaster("local") //Spark Master
.set("spark.cassandra.connection.host", "192.168.133.10")
.set("spark.cassandra.auth.username", "testuser")
.set("spark.cassandra.auth.password", "password")
val sc \= new SparkContext(sparkConf)
// Read text file and get the input data
val data \= sc.textFile("Give\_the\_path\_of\_your\_text\_file")
// Get the lines, split them into words, count the words and print
val wordCounts \= data.map(\_.value) // split the message into lines
.flatMap(\_.split(" ")) //split into words
.filter(w \=> w.length() \> 0) // remove empty words
.map(w \=> (w, 1L)).reduceByKey(\_ + \_) // count by word
.map({case (w,c) \=> (w,c,new Date().getTime)}) // add current time
// Each RDD will have below structure now
// Word, Word Count , Timestamp
// Print the first 5 lines of the RDD to verify the contents
wordCounts.take(5).foreach(println)
// Below details must already be present in Cassandra
// Keyspace name --> Key\_Space
// Table name --> Word\_Count\_Table
// Columns --> Word, Word\_Count, Timestamp
//======== Write Each RDD to Cassandra Database
wordCounts.foreachRDD(rdd \=> {
rdd.saveToCassandra("Key\_Space","Word\_count\_table")
})
//========= Read from Cassandra Database
// Lets read the same Data again from the same Cassandra table
val rdd1 \= sc.cassandraTable("Key\_Space","Word\_count\_table")
rdd1.take(10).foreach(println)
sc.stop()
System.exit(0)
}
}