Understanding data persistence in Lagom

Reading Time: 4 minutes

Hello everyone, in this blog, I am going to explain what is data persistence in lagom.  First of all,Before understanding the concept of data persistence, I suggest you understand the concept of CQRS because people who switch from monolithic to the microservice architecture they find it quite difficult to adopt the concept of CQRS. You can go through this blog to get a good understanding of CQRS.

What is a Persistent Entity

A PersistentEntity in lagom has a stable entity identifier.  With the help of the identifier, the entity can be accessed from the service implementation. The state of an entity is persistent using Event Sourcing.

Event sourcing: Event sourcing is a simple concept where all the changes in an entity are captured as a series of events.

Aggregate: Aggregate is a larger unit of encapsulation than just a class. Every transaction is scoped into a single aggregate. A persistent entity forms a tree or graph structure of object relations. The root of the tree is called an aggregate root which speaks for the whole and it may delegate down to the rest.

How to use lagoms persistent entity:

The use of lagoms persistent entity is shown with the help of an eg below-

import com.lightbend.lagom.scaladsl.persistence.PersistentEntity

class CustomerEntity  extends PersistentEntity {

  override type Command = BlogCommand
  override type Event   = BlogEvent
  override type State   = BlogState
  override def initialState: BlogState = BlogState.empty
  override def behavior: Behavior = Actions()

}

In the eg above we have created a class CustomerEntity that extends PersistentEntity abstract class and overrides three abstract type members i.e command, event and state.

  • Command – the superclass/interface of the commands
  • Event – the superclass/interface of the events
  • State – the class of the state

1. Commands

To interact with PersistentEntity, we have to send some command messages to it. Commands are instructions to do something, like create an account for a customer, fetch the details of a customer, etc.

trait CustomerCommand[R] extends ReplyType[R]
case class CreateCustomerCommand(customer: CustomerDetails) extends CustomerCommand[Done]

2. Events

A command may result in a change of state. Changes that are persisted as events, representing the effect of the command. For eg:

sealed trait CustomerEvent extends AggregateEvent[CustomerEvent] {
override def aggregateTag: AggregateEventTagger[CustomerEvent] =     CustomerEvent.Tag
}

object CustomerEvent {
 val Tag: AggregateEventTag[CustomerEvent] = AggregateEventTag[CustomerEvent]
}

case class Customer(customer: CustomerDetails) extends CustomerEvent

object Customer {
implicit val format: Format[Customer] = Json.format
}

3.State

The state is the condition that an entity is in a specific instance. You can modify it according to your requirements.

case class CustomerState(customer: Option[CustomerDetails]

4. InitialState

It is the state of an entity that defines when the entity was first created. Your entity class should also implement the abstract method initialState. For eg:

override def initialState = CustomerState(None)

5. Behavior

It defines the behavior of the entity. Behaviour is a function from the current state to actions that defines command and event handlers. Your entity class should also implement the abstract method behavior. The behavior is defined as a set of actions or functions

override def behavior: (UserState) => Actions

6.Actions

The behavior function process incoming commands and persisted events.

7.Command Handlers

Command handlers are required to process commands. A command handler is a partial function with 3 parameters ,i.e Command, CommandContext(ctx) and current State.A command handler returns a Persist directive. This directive defines what event or events to be persisted

  • thenPersist: this method will persist one single event
  • thenPersistAll: this method will persist in several events atomically.
  • done: no events are to be persisted
.onCommand[CreateCustomerCommand, Done] {
 case (CreateCustomerCommand(cust), ctx, _) =>
   ctx.thenPersist(Customer(cust))(_ ⇒ ctx.reply(Done))
}

A PersistentEntity may also process commands that do not change application state, such as query commands or commands that are not valid in the entity’s current state. Actions onReadOnly command registers such command. For eg:

.onReadOnlyCommand[GetCustomerCommand, CustomerDetails] {
 case (GetCustomerCommand(id), ctx, state) =>
   ctx.reply(state.customer.getOrElse(CustomerDetails(id, "name not found ", "email not found")))
}

8.Event Handlers

When an event has been persisted successfully, the current state is updated by applying the event to the current state. The onEvent method of Actions updates the state.

.onEvent {
 case (_, state) =>
   state
}

A reply is sent with the ctx.reply method and replies message type must match the ReplyType defined by the command. It will be an acknowledgment that the entity has processed the command successfully.

Below is a complete eg of complete CustomerEntity class-

class CustomerEntity extends PersistentEntity {
 override type Command = CustomerCommand[_]
 override type Event = CustomerEvent
 override type State = CustomerState
 override def initialState = CustomerState(None)
 override def behavior: (CustomerState) => Actions = {
  case CustomerState(_) => Actions()
    .onCommand[CreateCustomerCommand, Done] {
       case (CreateCustomerCommand(cust), ctx, _) =>
       ctx.thenPersist(Customer(cust))(_ ⇒ ctx.reply(Done))
     }
    .onReadOnlyCommand[GetCustomerCommand, CustomerDetails] {
      case (GetCustomerCommand(id), ctx, state) =>
      ctx.reply(state.customer.getOrElse(CustomerDetails(id, "name not found ", "email not found")))
     }
     .onEvent {
       case (_, state) => state
       }
    }
}

Now to access the entity, you need to inject the PersistentEntityRegistry in your service implementation class.

class UserServiceImpl(persistentEntityRegistry: PersistentEntityRegistry)

And also, you need to register the persistent entity and Json serializer registry in your application loader.

//Register the JSON serializer registry
override lazy val jsonSerializerRegistry = CustomerSerializerRegistry

// Register the lagom-persistent-entity-demo persistent entity
persistentEntityRegistry.register(wire[CustomerEntity])

9. Storing Persistent Events in lagom

By default, Lagom uses Cassandra to persist data. Lagom stores data in tables. Cassandra keyspaces save the stored data.

Project dependencies:

To use Cassandra add the following in your project’s build

In sbt, add the following in your build.sbt file

libraryDependencies += lagomScaladslPersistenceCassandra

Cassandra keyspace names must start with an alphanumeric character and contain only alphanumeric and underscore characters. They are case-insensitive and stored in lowercase.

Lagom has three internal components that require keyspace configuration:

  • The journal stores serialized events
  • The snapshot store stores snapshots of the state as an optimization for faster recovery
  • Cassandra Read-Side support uses the offset store.The Read-side processor uses it to keep track of the most recent event.

To configure keyspace names, you need to add the following in your service implementations’ application.conf file:

play.application.loader = com.knoldus.customer.impl.CustomerServiceLoader

user.cassandra.keyspace = customerdatabase
cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}

By default, Cassandra server startes on port 4000. However, You can disable the embedded Cassandra server by adding the following in your build.sbt and can use the external Cassandra running on your localhost.

lagomCassandraEnabled in ThisBuild := false

lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "http://localhost:9042").

That’s all about data persistence in lagom. Below is the link to my Github repo to demonstrate data persistence in lagom. In my next blog, I am going to explain how to explain the persistent read side in lagom. Till then, stay tuned.

Link to the GitHub repo: https://github.com/kaunath933/data-persistence-in-lagom

References:

Persistent Entity


Knoldus-blog-footer-image>

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading