Persistent Entity in Lagom

Reading Time: 4 minutes

Lagom is an open source framework for building systems of Reactive microservices in Java or Scala. It is build on Akka and Play. It has a concept of persistent entity and in this blog, we will learn about persistent entity in Lagom in detail. First of all, before reading about persistent entity it would be better if you have some knowledge about Event Sourcing and CQRS.

Persistent Entity

A persistent entity has an entity identifier by which it can be accessed from the service implementation or other parts of the application. They are used for holding the state of the individual entities. The use of event sourcing is done for persistence that means all the changes in the state are captured as events that are immutable in nature.

Choosing a database

You need to choose a database to manage and persist the data in your microservice or application. Some of the databases that Lagom supports are following:

  • Cassandra
  • PostgreSQL
  • MySQL
  • Oracle
  • H2

By default, Lagom uses Cassandra. It also provides embedded Cassandra so that the developer doesn’t have to worry about installing, configuring or managing Cassandra. Embedded Cassandra runs on port 4000. You can use runAll command(starts all services) to start embedded Cassandra or lagomCassandraStart command that only starts Cassandra.

To use Cassandra, add following to your build.sbt:

libraryDependencies += lagomScaladslPersistenceCassandra

In a configuration file you will have to provide Cassandra keyspace that will be different for each service. Also you will have to provide configuration for the following components :

  • journal: stores serialised events
  • snapshot-store: stores snapshots that helps in faster recovery
  • offset-store: keep track of the most recent event handled

The configuration file will look like following:

play.application.loader = com.knoldus.lagompersistententity.impl.service.LagomPersistentEntityLoader

lagom-persistent-entity.cassandra.keyspace = lagom_persistent_entity

cassandra-journal.keyspace = ${lagom-persistent-entity.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${lagom-persistent-entity.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${lagom-persistent-entity.cassandra.keyspace}

Persistent Entity Sub-class

PersistentEntity is an abstract class that has abstract type members and methods. You need to extend the class and override the abstract types and methods.

class InventoryEntity extends PersistentEntity {
  override type Command = InventoryCommand[_]
  override type Event = InventoryEvent
  override type State = InventoryState

  override def initialState: InventoryState = InventoryState(None,LocalDateTime.now().toString)

  override def behavior: InventoryState => Actions
}

Abstract types are following:

Command

It is a super class/interface for the commands. By using commands you can interact with persistent entity and it may result in change in state. A command must define what type of message to use as reply to the command by implementing the PersistentEntity.ReplyType interface. For instance a command can be AddProduct,GetProduct etc.You can define your commands in the following way :

trait InventoryCommand[R] extends ReplyType[R]

case class AddProductCommand(product: Product) extends InventoryCommand[Done]

object AddProductCommand {
  implicit val format: Format[AddProductCommand] = Json.format
}

Event

It is a super class/interface for the events. The change in state is due to commands that are persisted as events. Events basically show the effect of command. An event can be ProductAdded. You can define your events in the following way:

sealed trait InventoryEvent extends AggregateEvent[InventoryEvent] {
  override def aggregateTag: AggregateEventTagger[InventoryEvent] = InventoryEvent.Tag
}

object InventoryEvent {
  val Tag: AggregateEventTag[InventoryEvent] = AggregateEventTag[InventoryEvent]
}

case class ProductAdded(product: Product) extends InventoryEvent

State

It is a class of the State. State represents the condition of an entity at a specific instance. When the entity is started, current state is recovered by replaying the stored events. Here, snapshots play an important role. The entity may start replaying events after the snapshot of the state. This can reduce the time of recovering the state as less number of events will be replayed. You can define the state in the following way:

case class InventoryState(product: Option[Product],time: String)

Abstract methods to be defined are following:

initialState

It defines the state when the entity is first created. It can be following:

override def initialState = InventoryState(None,LocalDateTime.now().toString)

Behaviour

It returns the Behaviour of the entity. Behaviour is a function from current State to Actions. Actions() is used to create an immutable builder for defining the behavior. It has event handlers and command handlers

override def behavior: InventoryState => Actions

Command Handlers

It processes the incoming commands. A command handler is a partial function with 3 parameters that are following:

  • Command
  • CommandContext
  • Current State.

Following are different functions of Actions() that handles the commands:

  • onCommand: A PersistentEntity processes many commands that change application state. Here, you have a choice to persist the event. In this scenario you can use onCommand.
 Actions()
      .onCommand[AddProductCommand, Done] {
        case (AddProductCommand(product), ctx, _) =>
          ctx.thenPersist(ProductAdded(product))(_ ⇒ ctx.reply(Done))
      }
  • onReadOnlyCommand: A PersistentEntity may also process commands that do not change application state, such as query commands or commands that are not valid in the entity’s current state. In this scenario you can use onReadOnlycommand.
Actions()
      .onReadOnlyCommand[GetProductCommand, Product] {
        case (GetProductCommand(id), ctx, state) =>
          ctx.reply(state.product.getOrElse(Product(id, "not found", -1)))
      }

A command handler returns a persist directive that defines what event or events to persist.

The functions of context that can be used to create the persist directive are following:

  • thenPersist: will persist a single entity
  • thenPersistAll: will persist several events automatically. In this either all the events will be persisted or none if an error occur.
  • done: no events are to be persisted

Event Handlers

The event handler updates the current state. The state will only be updated if any event has been persisted. You can use onEvent function to do the above.

Actions()
      .onEvent {
        case (ProductAdded(product), _) =>
          InventoryState(Some(product), LocalDateTime.now().toString)
      }

Using PersistentEntity in Application

To use an entity in service implementation you have to inject the PersistentEntityRegistry. You can do it in the following way:

class LagomPersistentEntityServiceImpl(persistentEntityRegistry: PersistentEntityRegistry)(implicit ec: ExecutionContext) extends LagomPersistentEntityService 

You can get a PersistentEntityRef in the service implementation and can use following method to do that

 def ref(id: String): PersistentEntityRef[InventoryCommand[_]] = {
    persistentEntityRegistry
      .refFor[InventoryEntity](id)
  }

Also for doing this, you should register persistent entity and JSON serializer registry in your Application Loader as follows:

override lazy val jsonSerializerRegistry: JsonSerializerRegistry = UserSerializerRegistry
persistentEntityRegistry.register(wire[InventoryEntity])

For the full code you can visit this.

That’s all about Persistent Entity in Lagom. I hope this was helpful. 🙂

References:

Knoldus-blog-footer-image