DATA PERSISTENCE IN LAGOM

Reading Time: 5 minutes

Are you finding it difficult to understand lagom persistence? Don’t worry because help is right here.
In this blog, we will learn about lagom persistence with the help of a simple application and also discuss its theoretical aspects.
Before we begin, make sure you know about Event Sourcing and CQRS. You can read about it in details from this link .

Choosing a database

When we create any microservice, or in general any service, one of the biggest task is to manage data persistence. Lagom supports various databases for doing this task. By default, Lagom uses Cassandra to persist data. Tables, required to store data, are saved in cassandra keyspaces.
So, For now, we will be using Cassandra for storing our data. Our service basically creates a user on request and store the correspondng details in the database.

To use Cassandra, you need to add the following in your project’s build.sbt:

libraryDependencies += lagomScaladslPersistenceCassandra

Lagom requires keyspace configuration for three internal components – Journal, snapshot and offset.
Journal stores serialized events, Snapshots are automatically saved after a configured number of persisted events for faster recovery and Offset store provides read-side support.

Each microservice should have a unique keyspace name so that the tables of different services do not conflict with each other. However, You can use same keyspace for all of these components within one service.
To configure keyspace names, you need to add the following in your service implementations’ application.conf file:

play.application.loader = com.knoldus.user.impl.service.UserServiceLoader

user.cassandra.keyspace = userdatabase

cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}

By default, Cassandra server is started on port 4000. However, You can disable the embedded Cassandra server by adding the following in your build.sbt and can use the external cassandra running on your local host.

lagomCassandraEnabled in ThisBuild := false
lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "http://localhost:9042")

Persistent Entity

Lagom’s persistence can be handled by defining ‘PersistentEntity‘. Each instance of the entity has a stable entity identifier through which it can be accessed from service implementation or anywhere in the cluster. It is run by an actor and the state is durable using event sourcing.
To use lagom persistence, you need to define an entity class, that should extend PersistentEntity abstract class and override the abstract type members and method.

class UserEntity extends PersistentEntity

Three absrtact type members – Command, Event and State must be defined by the subclass.

override type Command = UserCommand[_]
override type Event = UserEvent
override type State = UserState

1. Command

You can interact with PersistentEntity by sending command messages to it. Commands are instructions to do something, like create user account, fetch user details, etc. Each command must implement the PersistentEntity.ReplyType interface to define reply type. Here is an example for how you need to define the commands for your application.

trait UserCommand[R] extends ReplyType[R]

case class CreateUserCommand(user: User) extends UserCommand[Done]

2. Event

A command may cause changes to the entity state, and those changes are stored as events. Events are the immutable facts of things that have happened like Account created or updated. Example for defining the event is given below.

sealed trait UserEvent extends AggregateEvent[UserEvent] {
  override def aggregateTag: AggregateEventTagger[UserEvent] = UserEvent.Tag
}
object UserEvent {
  val Tag: AggregateEventTag[UserEvent] = AggregateEventTag[UserEvent]
}
case class UserCreated(user: User) extends UserEvent

3. State

And, State is the condition that entity is in at specific instance. Events are replayed to recreate the current state of an entity. Below is an example to define the state. You can modify it according to your requirements.

case class UserState(user: Option[User], timeStamp: String)

4. InitialState

Your entity class should also implement abstract method ‘initialState‘ which defines the state of the entity when it is created for the first time.

override def initialState = UserState(None, LocalDateTime.now().toString)

5. Behavior

Another method that your concrete subclass should implement is ‘behavior‘. The behavior is defined as a set of actions or functions.

override def behavior: (UserState) => Actions
  • Command Handlers

To process commands, Command handlers are registered using ‘onCommand’ of the Actions. A command handler is a partial function with 3 parameters – Command, CommandContext(ctx) and current State. A command handler returns a Persist directive that defines what event or events, if any, to persist. thenPersist, thenPersistAll or done methods are used to create the Persist directive.

.onCommand[CreateUserCommand, Done] {
  case (CreateUserCommand(user), ctx, _) ⇒
    ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
}

A PersistentEntity may also process commands that do not change application state, such as query commands. Such command handlers are registered using ‘onReadOnlyCommand’.

.onReadOnlyCommand[GetUserCommand, User] {
   case (GetUserCommand(id), ctx, state) =>
    ctx.reply(state.user.getOrElse(User(id, "not found")))
}
  • Event Handlers

Event handlers are used both for persisting and replaying events. These are registered with the ‘onEvent’ method of the Actions. When an event has been persisted successfully, the current state is updated.

.onEvent {
  case (UserCreated(user), _) ⇒
    UserState(Some(user), LocalDateTime.now().toString)
}

A reply is sent with the ctx.reply method and reply message type must match the ReplyType defined by the command. It will be an acknowledgment that the entity has processed the command successfully.
You can use ctx.invalidCommand to reject an invalid command, which will fail the Future with PersistentEntity.InvalidCommandException on the sender side.

Here is the complete snapshot of UserEntity class:

class UserEntity extends PersistentEntity {

  override type Command = UserCommand[_]
  override type Event = UserEvent
  override type State = UserState

  override def initialState = UserState(None, LocalDateTime.now().toString)

  override def behavior: (UserState) => Actions = {
    case UserState(_, _) => Actions()
      .onCommand[CreateUserCommand, Done] {
      case (CreateUserCommand(user), ctx, _) ⇒
        ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
    }
      .onReadOnlyCommand[GetUserCommand, User] {
       case (GetUserCommand(id), ctx, state) =>
        ctx.reply(state.user.getOrElse(User(id, "not found")))
    }
      .onEvent {
        case (UserCreated(user), _) ⇒
          UserState(Some(user), LocalDateTime.now().toString)
      }
  }

}

Finally, to access the entity, you need to inject the PersistentEntityRegistry in your service implementation class.

class UserServiceImpl(persistentEntityRegistry: PersistentEntityRegistry)

And also, you need to register the persistent entity and Json serailizer registry in your application loader.

//Register the JSON serializer registry
override lazy val jsonSerializerRegistry = UserSerializerRegistry

// Register the lagom-persistent-entity-demo persistent entity
persistentEntityRegistry.register(wire[UserEntity])

In the service method you can retrieve a PersistentEntityRef for a given entity identifier from the registry. In the user application, a separate method for retrieving the Ref is being created.

def ref(id: String): PersistentEntityRef[UserCommand[_]] = {
  persistentEntityRegistry.refFor[UserEntity](id)
}

Then you can send the command to the entity using the ask method of the PersistentEntityRef. It returns a Future with the reply message. An example for that will be sending CreateUserCommand to the user entity.

ref(user.id).ask(CreateUserCommand(user))

And this is how Lagom’s persistence helps in managing data for your service.

Now, when you will run the application, you can see in cassandra query language shell(cqlsh) that keyspace named ‘userdatabase’ will get created having 4 tables – messages, config, snapshots and metadata. Events are actually persisted in messages table.
The complete demo code is available here. You can check README.md file for instructions to run the application.

References:

Hope you enjoyed reading this blog !!


knoldus-advt-sticker


 

Written by 

I am a Software Consultant, having experience of more than 1 year. I am well versed with Object Oriented Programming Paradigms having good command of programming languages like Scala, Java & C++ and also skilled in building the microservices architecture based application using Lagom, Cassandra, Elasticsearch and many more. My hobbies include reading novels, writing blogs, drawing, listening to music.

11 thoughts on “DATA PERSISTENCE IN LAGOM6 min read

Comments are closed.