DevOps | Cloud | Analytics | Open Source | Programming

Sample Code for Spark Cassandra Scala Application

This is a step by step sample code to Read write a Spark Cassandra Application in Scala. It shows how to Connect , Read into a Cassandra Table from a Spark Application. It uses Spark RDD Structure.

Scala Code for Spark Cassandra:

// Author -
// Write into Cassandra Database 
// Read from Cassandra Database
// Spark Scala program

package com.gankrin
import java.util.Properties
import scala.collection.JavaConversions.\_

// Basic Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.\_

// Spark SQL Cassandra imports
import com.datastax.spark.connector.\_
import org.apache.spark.sql.cassandra.\_
import com.datastax.spark.connector.\_
import com.datastax.spark.connector.rdd.\_
import org.apache.spark.sql.cassandra.\_

// Cassandra Java driver imports
import com.datastax.driver.core.Cluster

// Date import for processing logic
import java.util.Date

object AppSparkCassandra {

  def main(args: Array\[String\]) {

    // Configuration
    val sparkConf \= new SparkConf()
                    .setMaster("local")  //Spark Master
                    .set("", "")
                    .set("spark.cassandra.auth.username", "testuser")            
                    .set("spark.cassandra.auth.password", "password")

  	val sc \= new SparkContext(sparkConf)

  	// Read text file and get the input data
    val data \= sc.textFile("Give\_the\_path\_of\_your\_text\_file")
    // Get the lines, split them into words, count the words and print
    val wordCounts \=\_.value) // split the message into lines
              .flatMap(\_.split(" ")) //split into words
              .filter(w \=> w.length() \> 0) // remove empty words
              .map(w \=> (w, 1L)).reduceByKey(\_ + \_) // count by word
              .map({case (w,c) \=> (w,c,new Date().getTime)}) // add current time

    // Each RDD will have below structure now
    // Word, Word Count , Timestamp

    // Print the first 5 lines of the RDD to verify the contents

    // Below details must already be present in Cassandra
    // Keyspace name --> Key\_Space
    // Table name --> Word\_Count\_Table
    // Columns --> Word, Word\_Count, Timestamp

    //======== Write Each RDD to Cassandra Database
    wordCounts.foreachRDD(rdd \=> {

   //========= Read from Cassandra Database
   // Lets read the same Data again from the same Cassandra table
    val rdd1 \= sc.cassandraTable("Key\_Space","Word\_count\_table")




Additional Read -

Best Practices for Dependency Problem in Spark