Akka Persistence using MongoDB – Part I

Reading Time: 3 minutes

Akka is a powerful actor based tool kit for Concurrency, Parallelism and Clustering. But for every concurrent applications, Akka actors are not fit because Futures are another alternative for performing Concurrency. Most of the developers follow simple rules for using Future and Actors is “Futures for Concurrency, Actors for State“.

In concurrent applications most of the time we are facing race conditions and deadlocks. Because of this, we are using actors for maintain state in concurrent applications, but actors are maintain state in memory. When ever the system goes fail it is difficult to recover actor state and our whole applications goes down. For maintaining actor state on secondary storage, Akka tool kit provide us tool “Persistence“.

Today, we are creating transaction system where our TransactionActor handles Debit/Credit request and maintain balance in actor state.

class TransactionActor(openBalance: Double)

Akka persistence use “Event-Sourcing” for persist events and at the time for recovery all events are computed in memory. For more details of Event Sourcing.

By default Akka-Persistence used LevelDB for store events but Akka Community provides lots of plugins for using different database or we can also create our custom plugins.

In Transaction example we are using MongoDB for store events and for test cases we are using in-memory database . Below are the plugins:

Add Dependencies to build.sbt:

libraryDependencies ++= {
  val akkaVersion = "2.4.12"

  Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
    "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
    "ch.qos.logback" % "logback-classic" % "1.1.7",
    "com.github.ironfish" %% "akka-persistence-mongo"  % "1.0.0-SNAPSHOT",
    "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
    "org.scalatest" % "scalatest_2.11" % "3.0.1" % "test",
    "com.github.dnvriend" %% "akka-persistence-inmemory" % "1.3.14" % "test"
  )
}

TransactionActor:


class TransactionActor(openBalance: Double) extends PersistentActor with ActorLogging {

var balance = openBalance;

override def receiveRecover: Receive = {
case event: Event => {
log.info(s"${ event } Recover Start")

updateBalance(event)
}

case RecoveryCompleted => log.info("Balance recovery Completed")
}

override def receiveCommand: Receive = {
case Debit(amount) if balance >= amount => {
log.info(s"${ amount }: Amount Debit ... ")

persist(Debited(amount))(updateBalance)
sender() ! "Done Debit"
}
case Credit(amount) => {
log.info(s"${ amount }: Amount Credit ... ")

persist(Credited(amount))(updateBalance)
sender() ! "Done Credit"
}

case PrintBalance => log.info(s"Remaining Balance: ${ balance }")
case GetBalance => sender() ! balance
}

override def persistenceId: String = TransactionActor.name

val updateBalance: Event => Unit = {
case Debited(amount) => balance = TransactionService.balanceDebit(balance, amount)
case Credited(amount) => balance = TransactionService.balanceCredit(balance, amount)
}
}

object TransactionActor {

def props(openBalance: Double): Props = Props(classOf[TransactionActor], openBalance)

val name = "balance-transactions"
}

Explanation:

When we are creating actors, we need to override only one method receive for handling messages but for PersistentActor we need to override two methods receiveRecover and receiveCommand. Following are the detail of methods:

  • receiveRecover: Is used to receive past events and snapshots while the actor is recovering.
  • receiveCommand: Is used to handle messages after the actor has recovered.

Example:

new-doc-1_1

In the above story, In Step-I we are creating an Akka actor which have two methods, receiveRecover and receiveCommand . When the persistent actor is created, initially call goes to receiveRecover method for recovering persisted events and perform action on events and assign results into actor state.

All actor messages are handle by receiveCommand. In this method, first every event persist into database and secondly action is performing. If the event doesn’t persist successfully the actions never performs. In Step-II the actor goes down by using poison pill or any other reasons, but our all events are already in database.

new-doc-1_2

In Step-III another actor is appointed for same. This time again the receiveRecover method is called and recover all events and load into memory. After loading all events actor comes into last state and continue its work for message handling.

For download whole source code of example click on this link.

With Akka persistence, we can also use Snapshots, Persistence Queries and Clustering, all we will discuss in out next blogs.

References:


KNOLDUS-advt-sticker

Written by 

Harmeet Singh is a lead consultant, with experience of more than 5 years. He has expertise in Scala, Java, JVM, and functional programming. On a personal front; he is a food lover.

1 thought on “Akka Persistence using MongoDB – Part I3 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading