Apache spark + cassandra: Basic steps to install and configure cassandra and use it with apache spark with example

Table of contents
Reading Time: 3 minutes

spark-cassandra

To build an application using apache spark and cassandra you can use the datastax spark-cassandra-connector to communicate with spark. Before we are going to communicate with spark using connector we should know how to configure cassandra. So following are prerequisite to run example smoothly.

Following steps to install and configure cassandra

If you are new to cassandra first we nee to install cassandra on our local machine.

$wget http://downloads.datastax.com/community/dsc-cassandra-2.1.6-bin.tar.gz

next step is if you want to configure cassandra instance you can make changes in /conf/cassandra.yaml file. Some of important configurations are below:

cluster_name: ‘test_1node’

you can set the same cluster name its shows in which cluster node is connected

 listen_address: localhost

Right now we are using local instance otherwise we use ip address by that other node in cluster can communicate with this node using listen_address

rpc_address: localhost

rpc_address specify the ip or host name through which client communicate

pc_port: 9160

rpc_port for thrift protocol to listen for client

Now extract file using following command

$tar xzvf dsc-cassandra-2.1.6-bin.tar.gz

now run the cassandra instance on your local machine with the help on following command

$./bin/cassandra

when your cassandra instance will run correctly its shown the following info log on console

Node localhost/127.0.0.1 state jump to normal

Now your cassandra instance is running smoothly. Next step to make a keyspace and table, for that we use cqlsh so run cqlsh by following command

$./bin/cqlsh

Now we make a keyspace and table for this example we make table in with which show which spark version support connector version if you not incompatible version its throw exception

First we make keyspace as below

CREATE KEYSPACE spark WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1};

Now we make a table in this keyspace using following command

cqlsh> CREATE TABLE spark.try (spark float PRIMARY KEY ,connector float) ;

now your table schema is defined now we insert row of spark version and its compatible connector

cqlsh> INSERT INTO spark.try (spark, connector ) VALUES ( 1.4,1.4);

insert all data like above statement

Now to see data from table simply use command

cqlsh> SELECT * FROM spark.try ;

Now we jump to connect apache spark with our cassandra instance

first we make a sbt project and include depencencies of spark and cassandra-connector

name := "spark_cassandra"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0"
)

No we make a main object for our application as follows

object SparkCassandra extends App {

Now we make a spark configurations for our application as follows

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host","127.0.0.1") .setAppName("cassandra").setMaster("local[*]")
.set("spark.cassandra.connection.native.port", "9042")
.set("spark.cassandra.connection.rpc.port", "9160")

.set(“spark.cassandra.connection.host”,”127.0.0.1″) is used as a connection point to the cassandra cluster by default it is a master node

.setAppName(“cassandra”) is app name of the application which is use by the spark logs and show on the master UI of the cluster

.setMaster(“local[*]”) is use to specify master of the application

.set(“spark.cassandra.connection.native.port”, “9042”) is used to listen CQL native transport from clients

.set(“spark.cassandra.connection.rpc.port”, “9160”) is used by thrift protocol

These are the minimal requirement for connectivity to cassandra , you can also set more advance configuration, changes can be make in above configurations through cassandra.yaml file.

Now make a spark context with these configuration as follows

val sc = new SparkContext(conf)

Now you can directly make a RDD of the cassandra table as follows

val rdd = sc.cassandraTable("spark", "try")

Now you can simply perform any transformations and actions on this RDD, here we simply collect it and print on console

val file_collect=rdd.collect()                     file_collect.map(println(_))

it will give you following result on the console

Screenshot from 2015-06-23 16:32:11

For source code you can go to this repository https://github.com/phalodi/spark-cassandra

before run source code remember to make cassandra instance and start it as mention above

7 thoughts on “Apache spark + cassandra: Basic steps to install and configure cassandra and use it with apache spark with example3 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading