Using Spark , Spray and Couchbase for lightening fast REST Api’s


As we all know Apache Spark™ is a fast and general engine for large-scale data processing, and Couchbase is in-memory no-sql database. So by connecting these two we can get a lightening fast speed.

In this blog we are focusing on how to make CRUD operations on couchbase with Spark’s speed. I am assuming that you have a basic Spark’s installation and couchbase installation on your system. If you do not have do not worry you can easily look up the steps to download and install spark here , and couchbase here

So for using Couchbase API’s with RDD’s we need to make a build.sbt file and add this line to it.

"com.couchbase.client" %% "spark-connector" % "1.1.0"

So basically your build.sbt should look like.

name := "spark-spray-couchbase-starter-kit"

version := "1.0"

scalaVersion := "2.10.4"

organization := "com.knoldus"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.1",
"io.spray" %% "spray-can" % "1.3.3",
"io.spray" %% "spray-routing" % "1.3.3",
"org.apache.spark" %% "spark-sql" % "1.4.1",
"io.spray" %% "spray-testkit" % "1.3.3",
"org.specs2" %% "specs2" % "2.4.7",

"com.couchbase.client" %% "spark-connector" % "1.1.0"
)

assembleArtifact in packageScala := false // We don't need the Scala library, Spark already includes it

mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}

fork in run := true

Now you can access the Couchbase’s API’s for RDD’s in your code.

So now the next part is making building a service and binding it to the port.

package com.knoldus.sprayservices

import akka.actor.{ActorSystem, Props}
import akka.io.IO
import spray.can.Http

import scala.concurrent.duration.DurationInt

object StartSpark extends App {

// we need an ActorSystem to host our application in
implicit val actorSystem = ActorSystem("spark-services")
implicit val timeout = 30 seconds

// create and start our service actor
val service = actorSystem.actorOf(Props[SparkServices], "spark-services")

// start a new HTTP server on port 8080 with our service actor as the handler
IO(Http) ! Http.Bind(service, "0.0.0.0", port = 8080)

}

Here we are just using the spray server for building the REST Api and binding it to the 8080 port.

Now comes the main part of implementing  the couchbase CRUD operation using Spark and spray. So for performing this we need to set the spark conf so that it may know where to insert the data.

val sparkConf: SparkConf = new SparkConf().setAppName("couchbase-spark-spray-starter-kit").setMaster("local")
  .set("com.couchbase.nodes", "127.0.0.1").set("com.couchbase.bucket.userBucket", "")
val sc: SparkContext = new SparkContext(sparkConf)

Here we are giving the node 127.0.0.1  and the bucket name here is userBucket. Using this configuration we are making the Spark Context(sc).

Now  we are going to implement the CRUD operations using the Couchbase’s API for RDD’s.

package com.knoldus.sprayservices

import java.util.UUID

import akka.actor.{Actor, ActorContext}
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._
import org.apache.spark.{SparkConf, SparkContext}
import spray.http.StatusCodes._
import spray.http._
import spray.routing.Directive.pimpApply
import spray.routing.HttpService

import scala.util.Try

trait SparkService extends HttpService {

  val sparkConf: SparkConf = new SparkConf().setAppName("spark-spray-starter").setMaster("local")
    .set("com.couchbase.nodes", "127.0.0.1").set("com.couchbase.bucket.userBucket", "")
  val sc: SparkContext = new SparkContext(sparkConf)

  val sparkRoutes =
    path("insert" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
      get {
        complete {
          val documentId = "user::" + UUID.randomUUID().toString
          val jsonObject = JsonObject.create().put("name", name).put("email", email)
          val jsonDocument = JsonDocument.create(documentId, jsonObject)
          val savedData = sc.parallelize(Seq(jsonDocument))
          val issaved = Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
          issaved match {
            case true => HttpResponse(OK, s"Data is successfully persisted with id $documentId")
            case false => HttpResponse(InternalServerError, s"Data is not persisted and something went wrong")
          }
        }
      }
    } ~
      path("updateViaKV" / "name" / Segment / "email" / Segment / "id" / Segment) { (name: String, email: String, id: String) =>
        get {
          complete {
            val documentId = id
            val jsonObject = JsonObject.create().put("name", name).put("email", email)
            val jsonDocument = JsonDocument.create(documentId, jsonObject)
            val savedData = sc.parallelize(Seq(jsonDocument))
            val issaved = Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
            issaved match {
              case true => HttpResponse(OK, s"Data is successfully persisted with id $documentId")
              case false => HttpResponse(InternalServerError, s"Data is not persisted and something went wrong")
            }
          }
        }

      } ~
      path("getViaKV" / "id" / Segment) { (id: String) =>
        get {
          complete {
            val idAsRDD = sc.parallelize(Seq(id))
            val fetchedDocument = Try(idAsRDD.couchbaseGet[JsonDocument]().map(_.content.toString).collect).toOption
            fetchedDocument match {
              case Some(data) => HttpResponse(OK, data(0))
              case None => HttpResponse(InternalServerError, s"Data is not fetched and something went wrong")
            }
          }
        }
      } ~
      path("getViaView" / "name" / Segment) { (name: String) =>
        get {
          complete {
            val viewRDDData = Try(sc.couchbaseView(ViewQuery.from("userDdoc", "emailtoName").startKey(name)).collect()).toOption
            val emailFetched = viewRDDData.map(_.map(a => a.value.toString))
            emailFetched match {
              case Some(data) => HttpResponse(OK, data(0))
              case None => HttpResponse(InternalServerError, s"Data is not fetched and something went wrong")
            }
          }
        }
      } ~
      path("getViaN1Ql" / "name" / Segment) { (name: String) =>
        get {
          complete {
            val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(s"SELECT * FROM `userBucket` WHERE name LIKE '$name%'")).collect()).toOption
            val emailFetched = n1qlRDD.map(_.map(a => a.value.toString))
            emailFetched match {
              case Some(data) => HttpResponse(OK, data(0))
              case None => HttpResponse(InternalServerError, s"Data is not fetched and something went wrong")
            }
          }
        }
      }

}

class SparkServices extends Actor with SparkService {
  def actorRefFactory: ActorContext = context

  def receive: Actor.Receive = runRoute(sparkRoutes)
}

The methods saveToCouchbase(), couchbaseGet(), couchbaseView(),couchbaseQuery() are provided by couchbase so that we can perform the functionality on RDD’s . This is a basic implementation of how to perform CRUD operation on couchbase using the Spark.

We can also use the power of N1QL in this too by using the SQlContext, and they are said to be highly compatible too.

If you want you can check the code , it is in this repository.

References:
1. Spark with Spray Starter Kit by Himanshu Gupta.

2. Spark with Couchbase to Electrify Your Data Processing: Couchbase Connect 2015 by Michael Nitschinger

 

Advertisements

About shiv4nsh

Coder, Gamer, Learner..!!
This entry was posted in Akka, Scala and tagged , , , , , , , . Bookmark the permalink.

One Response to Using Spark , Spray and Couchbase for lightening fast REST Api’s

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s