DevOps | Cloud | Analytics | Open Source | Programming





Sample Code for Spark Cassandra Scala Application



This is a step by step sample code to Read write a Spark Cassandra Application in Scala. It shows how to Connect , Read into a Cassandra Table from a Spark Application. It uses Spark RDD Structure.

Scala Code for Spark Cassandra:


// Author - gankrin.org
// Write into Cassandra Database 
// Read from Cassandra Database
// Spark Scala program

package com.gankrin
import java.io.FileReader
import java.util.Properties
import scala.collection.JavaConversions.\_

// Basic Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.\_

// Spark SQL Cassandra imports
import com.datastax.spark.connector.\_
import org.apache.spark.sql.cassandra.\_
import com.datastax.spark.connector.\_
import com.datastax.spark.connector.rdd.\_
import org.apache.spark.sql.cassandra.\_

// Cassandra Java driver imports
import com.datastax.driver.core.Cluster

// Date import for processing logic
import java.util.Date

object AppSparkCassandra {

  def main(args: Array\[String\]) {

    // Configuration
    val sparkConf \= new SparkConf()
                    .setAppName("WordCountApp")
                    .setMaster("local")  //Spark Master
                    .set("spark.cassandra.connection.host", "192.168.133.10")
                    .set("spark.cassandra.auth.username", "testuser")            
                    .set("spark.cassandra.auth.password", "password")

  	val sc \= new SparkContext(sparkConf)

  	// Read text file and get the input data
    val data \= sc.textFile("Give\_the\_path\_of\_your\_text\_file")
	
    // Get the lines, split them into words, count the words and print
    val wordCounts \= data.map(\_.value) // split the message into lines
              .flatMap(\_.split(" ")) //split into words
              .filter(w \=> w.length() \> 0) // remove empty words
              .map(w \=> (w, 1L)).reduceByKey(\_ + \_) // count by word
              .map({case (w,c) \=> (w,c,new Date().getTime)}) // add current time

    // Each RDD will have below structure now
    // Word, Word Count , Timestamp

    // Print the first 5 lines of the RDD to verify the contents
    wordCounts.take(5).foreach(println)

    
    // Below details must already be present in Cassandra
    // Keyspace name --> Key\_Space
    // Table name --> Word\_Count\_Table
    // Columns --> Word, Word\_Count, Timestamp

    //======== Write Each RDD to Cassandra Database
    wordCounts.foreachRDD(rdd \=> {
      rdd.saveToCassandra("Key\_Space","Word\_count\_table")
    })

   //========= Read from Cassandra Database
   // Lets read the same Data again from the same Cassandra table
    val rdd1 \= sc.cassandraTable("Key\_Space","Word\_count\_table")
    rdd1.take(10).foreach(println)

    sc.stop()

    System.exit(0)
  }
}


   

Additional Read -

Best Practices for Dependency Problem in Spark

https://github.com/datastax