Lagom Read side and Write side

Reading Time: 3 minutes

Lagom is a framework that is used for building reactive microservices. It provides support for building read side and write side views of the data so that querying a microservice becomes easier.

In this blog, I’ll discuss about how having a read side view is beneficial and also important in microservices. Before reading this blog I’ll recommend you to have some knowledge about Event Sourcing and CQRS. Also, I hope you are clear with the concept of persistent entity. If not, then you can study it here.

Read-side and Write-side

The write side view is where we use commands and events. A command may result in persisting an event and change in current state.

The read side view is that in which we use queries from the service. Lagom provides ways to populate this read-side view.

Read side and write side in a microservice
Read side and write side in a microservice

This separation of the write side and the read side of the persistent data is known as CQRS (Command Query Responsibility Segregation) pattern.

Need of Read-Side View

Consider a scenario, in which you need to get the details of all the products whose supplier is ABC. For doing this, it’s easy to just query the database rather than giving many commands. Such scenarios raise the need of Read Side view of the data.

Read side and write side in a microservice in Lagom

To implement read side you can choose any database. Lagom provides some helpers for Cassandra so I’ll use Cassandra here. One thing that needs to be assured is that, the read-side should only be updated in response to receiving events from persistent entities. To handle these events you need to provide a ReadSideProcessor.

ReadSideProcessor

  • responsible for handling the events produced by the persistent entity
  • responsible for tracking which events it has handled
  • transforms the events in the form of tables

Implementing the ReadSideProcessor

You need to create a class that extends ReadSideProcessor and override it’s abstract methods. Also inject CassandraSession and CassandraReadSide.

Abstract methods that you need to override are following:

aggregateTags

This method has to return a list of all the tags that our processor will handle. To implement this method, simply return the list of all the events for your class:

override def aggregateTags: Set[AggregateEventTag[InventoryEvent]] =
  InventoryEvent.Tag.allTags

buildHandler

It is responsible for handling the events. CassandraReadSide has a builder method for creating a builder for these handlers. This builder will create a handler that will automatically handle the offsets.

readSide.builder[InventoryEvent]("inventoryOffset")

We have two callbacks here, that are following:

  • globalPrepare

It runs at least once across the cluster. It is used to do things like creating a table or preparing data that has to be there before read side processing. Although it is optional to use this callback but it is highly recommended as it saves you from manually creating table. Also if you have this callback then readside processing won’t start before this callback has been run successfully.

Here, I’m using createTable for globalPrepare callback:

  def createTable(): Future[Done] = {
    cassandraSession.executeCreateTable(
      s"""
         |CREATE TABLE IF NOT EXISTS $TABLE_NAME(
         |id text PRIMARY KEY,
         |name text,
         |quantity bigint
         |);
      """.stripMargin)
  }

register it with the builder

.setGlobalPrepare(createTable)
  • prepare

It runs once per shard after the read side processor is started. It can be used to prepare statements so that Cassandra’s handling can be optimized.

Using insert statement for prepare:

  def prepareStatements(): Future[Done] =
    cassandraSession.prepare(s"INSERT INTO $TABLE_NAME(id, name, quantity) VALUES (?, ?, ?)")
      .map { ps =>
        addEntity = ps
        Done
      }

register it with the builder

.setPrepare(_ => prepareStatements())

Event Handler

It is actually responsible for handling the event. It takes an event and returns a list of bound statements.

Here, it is

def addEntity(product: Product): Future[List[BoundStatement]] = {
    val bindInsertProduct: BoundStatement = addEntity.bind()
    bindInsertProduct.setString("id", product.id)
    bindInsertProduct.setString("name", product.name)
    bindInsertProduct.setLong("quantity", product.quantity)
    Future.successful(List(bindInsertProduct))
  }

register this with builder too

.setEventHandler[ProductAdded](ese => addEntity(ese.event.product))

Complete buildHandler will be like:

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[InventoryEvent] =
    readSide.builder[InventoryEvent]("inventoryOffset")
      .setGlobalPrepare(createTable)
      .setPrepare(_ => prepareStatements())
      .setEventHandler[ProductAdded](ese => addEntity(ese.event.product))
      .build()

Registering your ReadSideProcessor

You have to register your ReadSideProcessor in your Application Loader. You can do this in the following way:

readSide.register(wire[InventoryReadSideProcessor])

Look for the full code here.

That’s all about Lagom Read and Write side! I hope it was helpful.

References

Knoldus-blog-footer-image