Streaming data from Cassandra using Alpakka

Reading Time: 7 minutes

Alpakka project is an open-source initiative to implement stream aware and reactive pipelines using Java and Scala which is built on top of Akka streams and specially designed to provide a DSL for reactive and stream-oriented programming with built-in support for backpressure to avoid the flood of data. As a reference, Akka streams supports reactive streams and JDK 9+ compliant implementation and therefore fully interoperable with other implementations. In this blog, we are going to provide a sample to read and write data from/into Cassandra table using Alpakka.

The Alpakka Cassandra connector allows us to read and write data from/to Cassandra tables. Using Alpakka CassandraSource we can read the data from Cassandra tables and similarly we can use CassandraFlow or CassandraSink.

background-1201009_1920

To discuss this more, let’s take an example and go through the code:

Sample project:

To understand the Alpakka Cassandra connector we have created a sample project having different modules to separate out the data generation and processing part of the pipeline.

The sample project is divided into two major parts:

  1. GeneratorApp
  2. ProcessorApp

The Generator app is aimed at populating data into Cassandra table using Alpakka and the Processor app is developed to consume data from Cassandra table.

Let’s discuss these two in details:

Project dependencies:

Here is the dependency for Alpakka:

val alpakkaVersion = "1.1.0"

def alpakka: Def.Initialize[List[ModuleID]] = Def.setting {
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % alpakkaVersion :: Nil
}

To highlight the alpakka dependency, we are only including the Alpakka dependency here. You can find all the dependencies in Dependency.scala on Github repo.

Build.sbt:

Here is the “build.sbt” file for the sample project:

name := "alpakka-sample"

version := "1.0"
scalaVersion := "2.12.7"

import Dependencies._
import ProjectSettings._

lazy val commonUtil = BaseProject("common-util")
  .settings(
    libraryDependencies ++= compileDependencies(akkaHttp.value ++ akka.value ++ json4sNative.value ++ logback.value ++
      typesafeConfig.value ++ slf4j.value ++ log4j.value ++ logback.value ++ json4sNative.value ++ json4sEx.value ++
      jodaDate.value ++ alpakka.value)
    ++ testDependencies(mockito.value ++ scalaTest.value ++ spec2.value),
  parallelExecution in Test := false
)

lazy val core = BaseProject("core")
  .settings(
    libraryDependencies ++= compileDependencies(Nil)
      ++ testDependencies(Nil),
    parallelExecution in Test := false
  )
  .dependsOn(commonUtil)

lazy val processor = BaseProject("processor")
  .settings(
    libraryDependencies ++= compileDependencies(Nil)
      ++ testDependencies(Nil),
    parallelExecution in Test := false
  )
  .dependsOn(commonUtil, core)

lazy val generator = BaseProject("generator")
  .settings(
    libraryDependencies ++= compileDependencies(Nil)
      ++ testDependencies(Nil),
    parallelExecution in Test := false
  )
  .dependsOn(commonUtil, core)

As you can see the project has been divided into two major parts, one is a generator which is capable of populating Cassandra table data using Alpakka connector and another part is the processor which is capable of reading data from Cassandra table using Alpakka connector.

Usually, the Alpakka connector provides various API’s to facilitate the read and write capabilities for an application. Here for the better use of sessions, we have separate out the components into multiple pieces.

Cassandra Session:

Here is an example of CassandraSession created in the code which is required to establish a connection with Cassandra:

import com.datastax.driver.core._

trait CassandraSessionProvider {

  private val cassandraHost = "127.0.0.1"

  implicit val cassandraSession: Session = Cluster.builder
    .addContactPoint(cassandraHost)
    .withPort(9042)
    .build
    .connect()

}

As you can see, here we are creating a session object here using class Cluster. You can provide the Cassandra contact points and Cassandra port using “.addContactPoint” and “.withPort” functions. Once we have a Cassandra Connection we can use it for querying Cassandra.

Writing data to Cassandra table:

The next thing to do is to write data into a Cassandra table. Let’s have a look at the API’s provided by Alpakka to do that easily using Akka streams. As part of this module/service, we are going to generate some dummy data and push that data for Cassandra tables using Alpakka connector.

Here is a sample code snippet for class GeneratorApp that shows an App with Alpakka connector to write the data into Cassandra table:

import java.time.Instant
import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.cassandra.scaladsl.CassandraSink
import akka.stream.scaladsl.Source
import com.common.cassandra.CassandraSessionProvider
import com.common.logger.Logging
import com.datastax.driver.core.Session
import com.sample.model.Student
import com.sample.repositories.StudentRepo
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object GeneratorApp extends App with CassandraSessionProvider with Logging with StudentRepo {
  implicit val actorSystem: ActorSystem = ActorSystem()
  implicit lazy val materializer: ActorMaterializer = ActorMaterializer()
  implicit val session: Session= cassandraSession

  val currentMillies = Instant.now().toEpochMilli
  val students = (10 to 2000).map (elem => Student(currentMillies + elem, "name_" + (currentMillies + elem))).toList
  val sink = CassandraSink[Student](parallelism = 20, preparedStatementInsert, statementBinder)

  val result: Future[Done] = Source(students)
    .map { student =>
      info(s"Student found : [$student]")
      student
    }
    .runWith(sink)

  result.map { result =>
    info(s"Response found: [$result]")
  }.recover {
    case ex: Exception =>
      error(s"Error found while ingesting student into cassandra data: [$ex]")
      throw ex
  }
}

As you can see, in the above example we are using CassandraSessionProvider to get Cassandra Session in scope.  To run the entire application we need the actor system with actor materializer. The actor materializer is used to run the streaming pipeline as it will be used to create one actor that in turn creates actors for the transformation steps within the streaming pipeline. We are overriding session variable from cassandraSession which is required for prepareStatement within StudentRepo.

implicit val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
implicit val session: Session= cassandraSession

The next step is generating sample data. The variable students is being initialized with a list of 2K student objects to be written into the Cassandra table. The currentMillies is only used to generate new ids every time the application runs.

val currentMillies = Instant.now().toEpochMilli
val students = (10 to 2000).map (elem => Student(currentMillies + elem, "name_" + (currentMillies + elem))).toList

Now we need a Cassandra sink that will point to the database table in which data needs to be inserted. Alpakka connector provides a class called CasssandraSink which can be used to create a Cassandra sink using parallelism, PreapreStatement, statement binder, and implicit Cassandra session.

The parallelism defined the parallelism of Akka stream which means at a time these many elements can be processed. The prepareStatement is the query you want to use to insert data into the Cassandra table. The statement binder is used to bind data with the query as most of the time we push data in terms of the case classes.

val sink = CassandraSink[Student](parallelism = 20, preparedStatementInsert, statementBinder)

Once we have data ready to be pushed with a CassandraSink ready to accept the data we can start our stream using Akka stream provided Source. As you can see, the statement “.runWith(sink)” starts the stream and the stream elements will be processed accordingly. At the bottom of the file, we have used “recover” to log errors. Of course, you can choose to handle the exception and perform steps respectively.

Reading data from Cassandra table:

Just like writing data, we can read data from Cassandra table using Akka Streams. To demonstrate the working of reading data from Cassandra table using Alpakka connector, we have created a processor component that can read data from Cassandra table and pass them to Akka actors for processing in parallel.

Here is a code snippet (ProcessorApp) to describe the steps required to read data from Cassandra table.

import java.util.concurrent.TimeUnit
import akka.Done
import akka.actor.ActorSystem
import akka.routing.{Pool, RoundRobinPool}
import akka.stream.ActorMaterializer
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import com.common.cassandra.CassandraSessionProvider
import com.common.logger.Logging
import com.datastax.driver.core.{Row, Session}
import com.sample.model.Student
import com.sample.repositories.StudentRepo
import scala.concurrent.ExecutionContext.Implicits.global

object ProcessorApp extends App with StudentRepo with CassandraSessionProvider with Logging {

  implicit val actorSystem: ActorSystem = ActorSystem()
  implicit lazy val materializer = ActorMaterializer()
  override implicit val session: Session= cassandraSession
  implicit val timeout: Timeout = Timeout(50, TimeUnit.MILLISECONDS)

  // A pool of actors to process the messages
  val actorRouter = actorSystem.actorOf(
    RoundRobinPool(10, supervisorStrategy = Pool.defaultSupervisorStrategy)
      .props(ProcessorActor.props(actorSystem))
  )

  val result = CassandraSource(prepareStatementSelect)
    .map (transform)
    .ask[Done](10)(actorRouter)
    .runWith(Sink.ignore)

  result.recover {
    case ex: Exception =>
      error(s"Error found while fetching data from cassandra table: [$ex]")
      throw ex
  }

  private def transform(row: Row): Student = {
    info(s"Got a new row from cassandra table : [$row]")
    Student(
      row.getLong("id"),
      row.getString("name")
    )
  }

}

Similar to writing data to Cassandra table we need actor system, materializer and Cassandra session to read data from the Cassandra table:

implicit val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
override implicit val session: Session= cassandraSession

The next thing is to write the processing logic for the messages, which can be defined inside a single or composite function or in an Akka actor to process messages in a parallel manner.

For this sample, we have created a separate actor (ProcessorActor) and we use it in an actor router to achieve the actor pool to process multiple messages simultaneously.

class ProcessorActor extends Actor with Logging {

  override def receive: Receive = {
    case student: Student =>
      info(s"Got new student to persist: [$student]")
      sender() ! Done
    case msg =>
      warn(s"Unknown messages found : [$msg]")
  }
}

It’s a very simple actor which will log the message and return the Done response to the message sender. At the application site, we are creating a router for this actor which works as an actor pool and uses a round-robin algorithm to forward messages to actors inside the pool.

// A pool of actors to process the messages
val actorRouter = actorSystem.actorOf(
  RoundRobinPool(10, supervisorStrategy = Pool.defaultSupervisorStrategy)
    .props(ProcessorActor.props(actorSystem))
)

The next task is to create a stream to read data from Cassandra table:

val result = CassandraSource(prepareStatementSelect)
  .map (transform)
  .ask[Done](10)(actorRouter)
  .runWith(Sink.ignore)

The CassandraSource is utility provided by Apkka connector to read data from Cassandra table which require the Cassandra session and statement for executing the query on Cassandra table.

lazy val prepareStatementSelect: Statement = new SimpleStatement(s"SELECT * FROM $keyspace.student").setFetchSize(fetchSize)

The query returns the stream of data from Cassandra in the form of Row. A Row is an abstraction over Cassandra data provided by Cassandra driver.  To transform the row into required case classes we are using a method called transform this returns the Student case class which can be easily processed by Akka actors.

private def transform(row: Row): Student = {
  info(s"Got a new row from cassandra table : [$row]")
  Student(
    row.getInt("id"),
    row.getString("name")
  )
}

Once we have the Student case class, we can pass this directly to the actor using the ask method.

.ask[Done](10)(actorRouter)

Here the Done is the return type of actor ask, 10 is the parallelism factor and actorRouter is actor pool.

In addition to that, we are using recover to log potential errors.

Running sample Applications:

So, now we have all the components required to test out implementation. To run both generator and processor applications, Here are the steps that can be used to run the sample application:

Step 1. Run Cassandra on your local environment

Step 2. Create Cassandra tables using cqlsh tool:

cqlsh> create KEYSPACE alpakka_sample with replication = {‘class’:’SimpleStrategy’,’replication_factor’:1}

cqlsh> create table alpakka_sample.student(id bigint PRIMARY KEY, name text);

Step 3. Run generator application using the following command:

sbt “project generator” run

Step 4. Run processor application using the following command:

sbt “project processor” run

Note: While reading from Cassandra there is no inbuilt functionality to provide to read continuously in fixed or variable time intervals so to achieve that you can use “RestartSource.withBackoff” that will restart Cassandra source after it completes.

In our next blog, we will be discussing about that as well.

Hope you enjoyed the blog.

Knoldus-blog-footer-image

Written by 

Girish is a Software Consultant, with experience of more than 3.5 years. He is a scala developer and very passionate about his interest towards Scala Eco-system. He has also done many projects in different languages like Java and Asp.net. He can work in both supervised and unsupervised environment and have a craze for computers whether working or not, he is almost always in front of his laptop's screen. His hobbies include reading books and listening to music. He is self motivated, dedicated and focused towards his work. He believes in developing quality products. He wants to work on different projects and different domains. He is curious to gain knowledge of different domains and try to provide solutions that can utilize resources and improve performance. His personal interests include reading books, video games, cricket and social networking. He has done Masters in Computer Applications from Lal Bahadur Shastri Institute of Management, New Delhi.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading