To build an application using apache spark and cassandra you can use the datastax spark-cassandra-connector to communicate with spark. Before we are going to communicate with spark using connector we should know how to configure cassandra. So following are prerequisite to run example smoothly.
Following steps to install and configure cassandra
If you are new to cassandra first we nee to install cassandra on our local machine.
$wget http://downloads.datastax.com/community/dsc-cassandra-2.1.6-bin.tar.gz
next step is if you want to configure cassandra instance you can make changes in /conf/cassandra.yaml file. Some of important configurations are below:
cluster_name: ‘test_1node’
you can set the same cluster name its shows in which cluster node is connected
listen_address: localhost
Right now we are using local instance otherwise we use ip address by that other node in cluster can communicate with this node using listen_address
rpc_address: localhost
rpc_address specify the ip or host name through which client communicate
pc_port: 9160
rpc_port for thrift protocol to listen for client
Now extract file using following command
$tar xzvf dsc-cassandra-2.1.6-bin.tar.gz
now run the cassandra instance on your local machine with the help on following command
$./bin/cassandra
when your cassandra instance will run correctly its shown the following info log on console
Node localhost/127.0.0.1 state jump to normal
Now your cassandra instance is running smoothly. Next step to make a keyspace and table, for that we use cqlsh so run cqlsh by following command
$./bin/cqlsh
Now we make a keyspace and table for this example we make table in with which show which spark version support connector version if you not incompatible version its throw exception
First we make keyspace as below
CREATE KEYSPACE spark WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1};
Now we make a table in this keyspace using following command
cqlsh> CREATE TABLE spark.try (spark float PRIMARY KEY ,connector float) ;
now your table schema is defined now we insert row of spark version and its compatible connector
cqlsh> INSERT INTO spark.try (spark, connector ) VALUES ( 1.4,1.4);
insert all data like above statement
Now to see data from table simply use command
cqlsh> SELECT * FROM spark.try ;
Now we jump to connect apache spark with our cassandra instance
first we make a sbt project and include depencencies of spark and cassandra-connector
name := "spark_cassandra"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0"
)
No we make a main object for our application as follows
object SparkCassandra extends App {
Now we make a spark configurations for our application as follows
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host","127.0.0.1") .setAppName("cassandra").setMaster("local[*]")
.set("spark.cassandra.connection.native.port", "9042")
.set("spark.cassandra.connection.rpc.port", "9160")
.set(“spark.cassandra.connection.host”,”127.0.0.1″) is used as a connection point to the cassandra cluster by default it is a master node
.setAppName(“cassandra”) is app name of the application which is use by the spark logs and show on the master UI of the cluster
.setMaster(“local[*]”) is use to specify master of the application
.set(“spark.cassandra.connection.native.port”, “9042”) is used to listen CQL native transport from clients
.set(“spark.cassandra.connection.rpc.port”, “9160”) is used by thrift protocol
These are the minimal requirement for connectivity to cassandra , you can also set more advance configuration, changes can be make in above configurations through cassandra.yaml file.
Now make a spark context with these configuration as follows
val sc = new SparkContext(conf)
Now you can directly make a RDD of the cassandra table as follows
val rdd = sc.cassandraTable("spark", "try")
Now you can simply perform any transformations and actions on this RDD, here we simply collect it and print on console
val file_collect=rdd.collect() file_collect
.map(println(_))
it will give you following result on the console
For source code you can go to this repository https://github.com/phalodi/spark-cassandra
before run source code remember to make cassandra instance and start it as mention above
Reblogged this on dotnetask.
Reblogged this on manojknol.
Reblogged this on himanshu2014.
Reblogged this on knoldernarayan.
Reblogged this on Play!ng with Scala.
How did you this application?