Akka Persistence: Making Actor Stateful

integrating Cucumber with Akka-Http
Reading Time: 3 minutes

Akka is a toolkit for designing scalable, resilient systems that span processor cores and networks. Akka allows you to focus on meeting business needs instead of writing low-level code to provide reliable behavior, fault tolerance, and high performance.

Akka actor can have state but it’s lost when the actor is shutdown or crashed. Fortunately, we can persist actor state using Akka Persistence which is one of Akka extensions.

Akka Persistence has mainly the following two modules:

Akka Persistence Module

1) Akka Persistence: The main goal of the Akka Persistence module is to provide a way to persist Stateful Actor’s internal state to a Data Store, File System, In-Memory, and more. When that Actor is crashed/started/restarted by its JVM or Supervisor, the Akka Persistence module uses those storages to recover that Actor to the previous state. The key concept behind Akka persistence is that only the events received by the actor are persisted, not the actual state of the actor (though actor state snapshot support is also available).  A stateful actor is recovered by replaying the stored events to the actor, allowing it to rebuild its state.

2) Akka Persistence Query: Akka persistence query complements Persistence by providing a universal asynchronous stream based query interface that various journal plugins can implement in order to expose their query capabilities.

Configure Akka Persistence

Dependency for Akka Persistence:

libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.5.21"

LevelDB-based plugins will require the following additional dependency:

libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

Architecture

Persistence Architecture

  • PersistentActor: Is a persistent, stateful actor. It is able to persist events to a journal and can react to them in a thread-safe manner. It can be used to implement both commands as well as event sourced actors. When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can recover its state from these messages.
  • AtLeastOnceDelivery: To send messages with at-least-once delivery semantics to destinations, also in case of the sender and receiver JVM crashes.
  • AsyncWriteJournal: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. Journal maintains highestSequenceNr that is increased on each message. The storage backend of a journal is pluggable. The persistence extension comes with a “leveldb” journal plugin, which writes to the local filesystem. Replicated journals are available as Community plugins.
  • Snapshot store: A snapshot store persists snapshots of a persistent actor’s state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The persistence extension comes with a “local” snapshot storage plugin, which writes to the local filesystem. Replicated snapshot stores are available as Community plugins
  • Event sourcing: Based on the building blocks described above, Akka persistence provides abstractions for the development of event sourced applications (see section Event sourcing).

Now Let us take an example to understand it better. You can find the complete code explained here on my GitHub repository.



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


import akka.actor.Props
import akka.persistence.{PersistentActor, SnapshotOffer}
import com.knoldus.models._
import com.knoldus.persistence.CounterPersistentActor.Response
class CounterPersistentActor(id: String) extends PersistentActor {
override val persistenceId: String = id
var state = State(count = 0)
def updateState(event:Event) = {
event match {
case Event(Increment(count)) => state = State(state.count + count)
case Event(Decrement(count)) => state = State(state.count – count)
}
}
override def receiveRecover: Receive = {
case event: Event =>
println(s"Actor is currently recovering its state")
updateState(event)
case SnapshotOffer(_, snapshot: State) =>
println(s"Snapshot data: $snapshot")
state = snapshot
}
override def receiveCommand: Receive = {
case command @ Command(op) =>
println(s"$command is under process")
persist(Event(op)) { event =>
updateState(event)
sender() ! Response("Done Processing")
}
case Checkpoint =>
println(s"Current State: ${state.count}")
sender() ! Response(s"Current State: ${state.count}")
}
}
object CounterPersistentActor {
def props(id: String) = Props(new CounterPersistentActor(id))
case class Response(message: String)
}

For creating a persistent actor we need to extend PersistentActor trait and override these methods:

1) receiveRecover method defines how the state is updated during recovery by handling Evt and SnapshotOffer messages.

2) receiveCommand method is a command handler. A command is handled by generating an event which is then persisted and handled. Events are persisted by calling persist with an event (or a sequence of events) as the first argument and an event handler as the second argument.

persist method persists events asynchronously and the event handler is executed for successfully persisted events.

This is all for now. In my Next blog, I will be covering the other persistent module namely Akka Persistent Query.

Thanks for your time…!!

Reference:

Akka Persistence



knoldus-advt-sticker

Written by 

I am a Software Consultant at Knoldus Inc. I am a Scala Enthusiast. I am familiar with Object Oriented Programming Paradigms, and has also worked upon .NET based technologies. Aside from being a programmer, I am familiar with NoSQL database technologies such like Cassandra. I also worked on Lagom microservice architecture.