Cassandra with Spark 2.0 : Building Rest API !


In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick  😉  and Cassandra as the data store.

We have seen the power of Spark earlier and when it is combined with Cassandra in a right way it becomes even more powerful. Earlier we have seen how to build Rest Api on Spark and Couchbase in this blog post, hence this will be about how to do the same thing in Cassandra.

So lets get started with the Code:

Your build.sbt should look like this :

name := "cassandra-spark-akka-http-starter-kit"

version := "1.0"

scalaVersion := "2.11.8"

organization := "com.knoldus"

val akkaV = "2.4.5"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.0",
"org.apache.spark" % "spark-sql_2.11" % "2.0.0",
"com.typesafe.akka" %% "akka-http-core" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3",
"net.liftweb" % "lift-json_2.11" % "2.6.2"

)

assembleArtifact in assemblyPackageScala := false // We don't need the Scala library, Spark already includes it

assemblyMergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}

ivyScala := ivyScala.value map {
_.copy(overrideScalaVersion = true)
}
fork in run := true

Database Access layer:

And your Database Access layer should look like this :

trait DatabaseAccess {

import Context._

def create(user: User): Boolean =
Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined

def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption
}

object DatabaseAccess extends DatabaseAccess

Service Layer:

Now your routing file should look like this :

package com.knoldus.routes

import java.util.UUID

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.ActorMaterializer
import com.knoldus.domain.User
import com.knoldus.factories.DatabaseAccess
import net.liftweb.json._
import java.util.Date
import net.liftweb.json.Extraction._

trait SparkService extends DatabaseAccess {

  implicit val system:ActorSystem
  implicit val materializer:ActorMaterializer
  val logger = Logging(system, getClass)

  implicit def myExceptionHandler =
    ExceptionHandler {
      case e: ArithmeticException =>
        extractUri { uri =>
          complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
        }
    }

  implicit val formats: Formats = new DefaultFormats {
    outer =>
    override val typeHintFieldName = "type"
    override
    val typeHints = ShortTypeHints(List(classOf[String], classOf[Date]))
  }

  val sparkRoutes: Route = {
    get {
      path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
        complete {
          val documentId = "user::" + UUID.randomUUID().toString
          try {
            val user = User(documentId,name,email)
            val isPersisted = create(user)
            if (isPersisted) {
              HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
            } else {
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
          }
        }
      }
    } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) =>
      get {
        complete {
          try {
            val idAsRDD: Option[Array[User]] = retrieve(listOfIds)
            idAsRDD match {
              case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x)))))
              case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")
          }
        }
      }
    }
  }
}

 

This blog is in the continuation of building your rest services using Spark and Couchbase and here we just changed the datastore to Cassandra, and hence I did not explained each and every step. It contains just the simple implementation of REST API! If you want to know in detail please take a look here :

Scala, Couchbase, Spark and Akka-http: A combinatory tutorial for starters

In future , we will be continuing the same thing using Neo4j too :P.

So stay tuned !

You can find the code here on my github: shiv4nsh

If You have any questions you can contact me here or on Twitter: @shiv4nsh

I would be happy to help.

Till then

happy hAKKAing !  !  !

KNOLDUS-advt-sticker

Advertisements

About shiv4nsh

Coder, Gamer, Learner..!!
This entry was posted in Akka, akka-http, apache spark, Cassandra, Scala, scalatest, Spark and tagged , , , , , , , , , , , , , . Bookmark the permalink.

2 Responses to Cassandra with Spark 2.0 : Building Rest API !

  1. Pingback: Cassandra with Spark 2.0 : Building Rest API ! | Scala Lovers

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s