Akka Persistence using MongoDB – Part I


Akka is a powerful actor based tool kit for Concurrency, Parallelism and Clustering. But for every concurrent applications, Akka actors are not fit because Futures are another alternative for performing Concurrency. Most of the developers follow simple rules for using Future and Actors is “Futures for Concurrency, Actors for State“.

In concurrent applications most of the time we are facing race conditions and deadlocks. Because of this, we are using actors for maintain state in concurrent applications, but actors are maintain state in memory. When ever the system goes fail it is difficult to recover actor state and our whole applications goes down. For maintaining actor state on secondary storage, Akka tool kit provide us tool “Persistence“.

Today, we are creating transaction system where our TransactionActor handles Debit/Credit request and maintain balance in actor state.

class TransactionActor(openBalance: Double)

Akka persistence use “Event-Sourcing” for persist events and at the time for recovery all events are computed in memory. For more details of Event Sourcing.

By default Akka-Persistence used LevelDB for store events but Akka Community provides lots of plugins for using different database or we can also create our custom plugins.

In Transaction example we are using MongoDB for store events and for test cases we are using in-memory database . Below are the plugins:

Add Dependencies to build.sbt:

libraryDependencies ++= {
  val akkaVersion = "2.4.12"

  Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
    "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
    "ch.qos.logback" % "logback-classic" % "1.1.7",
    "com.github.ironfish" %% "akka-persistence-mongo"  % "1.0.0-SNAPSHOT",
    "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
    "org.scalatest" % "scalatest_2.11" % "3.0.1" % "test",
    "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.3.14" % "test"
  )
}

TransactionActor:


class TransactionActor(openBalance: Double) extends PersistentActor with ActorLogging {

var balance = openBalance;

override def receiveRecover: Receive = {
case event: Event => {
log.info(s"${ event } Recover Start")

updateBalance(event)
}

case RecoveryCompleted => log.info("Balance recovery Completed")
}

override def receiveCommand: Receive = {
case Debit(amount) if balance >= amount => {
log.info(s"${ amount }: Amount Debit ... ")

persist(Debited(amount))(updateBalance)
sender() ! "Done Debit"
}
case Credit(amount) => {
log.info(s"${ amount }: Amount Credit ... ")

persist(Credited(amount))(updateBalance)
sender() ! "Done Credit"
}

case PrintBalance => log.info(s"Remaining Balance: ${ balance }")
case GetBalance => sender() ! balance
}

override def persistenceId: String = TransactionActor.name

val updateBalance: Event => Unit = {
case Debited(amount) => balance = TransactionService.balanceDebit(balance, amount)
case Credited(amount) => balance = TransactionService.balanceCredit(balance, amount)
}
}

object TransactionActor {

def props(openBalance: Double): Props = Props(classOf[TransactionActor], openBalance)

val name = "balance-transactions"
}

Explanation:

When we are creating actors, we need to override only one method receive for handling messages but for PersistentActor we need to override two methods receiveRecover and receiveCommand. Following are the detail of methods:

  • receiveRecover: Is used to receive past events and snapshots while the actor is recovering.
  • receiveCommand: Is used to handle messages after the actor has recovered.

Example:

new-doc-1_1

In the above story, In Step-I we are creating an Akka actor which have two methods, receiveRecover and receiveCommand . When the persistent actor is created, initially call goes to receiveRecover method for recovering persisted events and perform action on events and assign results into actor state.

All actor messages are handle by receiveCommand. In this method, first every event persist into database and secondly action is performing. If the event doesn’t persist successfully the actions never performs. In Step-II the actor goes down by using poison pill or any other reasons, but our all events are already in database.

new-doc-1_2

In Step-III another actor is appointed for same. This time again the receiveRecover method is called and recover all events and load into memory. After loading all events actor comes into last state and continue its work for message handling.

For download whole source code of example click on this link.

With Akka persistence, we can also use Snapshots, Persistence Queries and Clustering, all we will discuss in out next blogs.

References:


KNOLDUS-advt-sticker

This entry was posted in Akka, MongoDB, NoSql, Scala, Tutorial and tagged , , , , , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s