Sharing RDD’s states across Spark applications with Apache Ignite

Apache Ignite offers an abstraction over native Spark RDDs such that the state of RDDs can be shared across spark jobs, workers and applications which is not possible with native Spark RDDS. In this blog, we will walk through the steps on how to share RDDs between two spark Application.

Preparing Ingredients

To test the Apache Ignite with Apache Spark application we need at least one master process and a worker node. Download Apache Spark pre-built binary and Apache Ignite and put at the same location on all nodes. Let us call these directories SPARK_HOME and IGNITE_HOME respectively.

I am assuming you are aware with the basics of setting up a spark cluster. If not, you can go through spark documentation.

Start Master Node

Switch to SPARK_HOME on master node and run:
As soon as you hit the command, the shell will give a logging file info saying “starting org.apache.spark.deploy.master.Master, logging to … [logging_dire]. You can get the master URL in the form [spark://master_host:master_port] from the log file. I got it in the log file as:

Start Workers

Switch to directory SPARK_HOME on worker node and run the below command:
You can notice, the master URL is provided while starting the worker. Once it is registered with the master, you will get notification as:

Start Ignite

On each of the worker switch to the directory  IGNITE_HOME and start an Ignite node by running the following command:
This will start Ignite node on the worker.

Creating Sample Spark Application

Now we will package and submit two spark applications, namely: RDDProducer and RDDConsumer on the master. The application RDDProducer saves a pair RDD into Ignite node. Here is a glimpse of code of these two applications:

import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration._
import org.apache.spark.{SparkConf, SparkContext}

object RDDProducer extends App {
  val conf = new SparkConf().setAppName("SparkIgnite")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration())
  val sharedRDD: IgniteRDD[Int,Int] = ic.fromCache("partitioned")
  sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))

object RDDConsumer extends App {
  val conf = new SparkConf().setAppName("SparkIgnite")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration())
  val sharedRDD = ic.fromCache("partitioned")
  val lessThanTen = sharedRDD.filter(_._2 < 10)
  println("The count is:::::::::::: "+lessThanTen.count())

Sharing RDD from Spark Application

Let us go through the application one by one. IgniteContext is the main entry point for Spark-Ignite integration. Here application RDDProducer  creates an IgniteConetxt[Int,Int] by supplying Spark configuration and a closure to instantiate default IgniteConfiguration. After successfully created IgniteConfiguration, IgniteRDD is created by invoking method fromCache(“partitioned”) on IgniteConfiguration (“partitioned” is the name of the Ignite Cache). Here IgniteRDD is live view of Ignite cache holding the RDD. IgniteRDD has all the methods that RDD supports.

The following line saves the spark RDD into IgniteCache.

sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))

Retrieving RDD from another Spark Application

The application RDDConsumer have all the configuration and steps as application RDDProducer except it never saves an RDD to an Ignite Cache. Its been done already by previous application. It simply retrieves the RDDs cached from Ignite cache by

  val sharedRDD = ic.fromCache("partitioned")

and apply a transformation filter for pairs having values less than ten and count those values up and prints it.

Deploying Applications.

I am assuming you’ve packaged the applications into a jar, ready to be submitted to the cluster. The instruction for packaging Spark application into a single jar can be found here. The application source can be found at: Github. Switch to SPARK_HOME and run following command to submit these applications on the cluster:

./bin/spark-submit --class "com.knoldus.RDDProducer"  --master spark:// "/home/knoldus/Projects/Spark Lab/spark-ignite/target/scala-2.11/spark_ignite-assembly-1.0.jar"
./bin/spark-submit --class "com.knoldus.RDDConsumer"  --master spark:// "/home/knoldus/Projects/Spark Lab/spark-ignite/target/scala-2.11/spark_ignite-assembly-1.0.jar"

We will deploy these applications one by one by changing the –class argument. Here first app RDDProducer will cache the PairRDD into Ignite cache and when we deploy second application, The output will be like:
It is obvious from the result that we were able to retrieve the RDD back in another application from the Ignite cache.
For Code example,  checkout : GitHub


Apache Ignite Documentation


About Manish Mishra

Manish is a Scala Developer at Knoldus Software LLP. He loves to learn and share about Functional Programming, Scala, Akka, Spark.
This entry was posted in apache spark, Scala, Spark and tagged , , , , . Bookmark the permalink.

2 Responses to Sharing RDD’s states across Spark applications with Apache Ignite

  1. do you have a example java ?

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s