Persistent Read Side in Lagom

Here in this blog, we will be discussing how we can query the lagom’s microservices for retrieving data. I hope you are clear with persistent entity concepts in Lagom , if not, you can take a quick overview by going through this blog.

Lagom handles data persistence by using ‘Persistent Entity’ which holds the state of individual entities, but to interact with them one must know the identifier of the entity. Hence, Lagom provides support to build read-side view of the persistent data which can be used for querying purpose.
And, this separation of the write-side and the read-side of the persistent data is often referred to as the CQRS (Command Query Responsibility Segregation) pattern.

Read-Side Processor

Read side can be implemented using any database. For now, we will be using Cassandra to understand its concepts.

One thing to keep in mind is that read side should only be updated in response to events received from persistent entities.
This is done by building a ReadSideProcessor which will transform the events generated by the Persistent Entities into database tables that can be queried, and at the same time it keeps track of which events it has handled by using offsets.

Each event produced by a persistent entity has an offset. When a read-side processor first starts, it loads the offset of the last event that is processed, and whenever an event is processed, it stores its offset.

This is how ReadSideProcessor class looks like:

class UserProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext)
  extends ReadSideProcessor[UserEvent] {

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[UserEvent] = ???

  override def aggregateTags: Set[AggregateEventTag[UserEvent]] = ???
}

We have injected the Cassandra session and Cassandra read-side support.
The above class extends ReadSideProcessor[T] trait and we need to implement two methods – aggregateTags and buildHandler. But before we discuss them in detail, we first need to understand how we can tag the events.

Event Tags

In order to consume events from a read-side, the events need to be tagged. Events can be tagged by making them implement the AggregateEvent interface. The tag is defined using the aggregateTag method. Here is an example of how you can tag the event.

trait UserEvent extends AggregateEvent[UserEvent] {
  override def aggregateTag: AggregateEventTagger[UserEvent] = UserEvent.Tag
}

object UserEvent {
  val numberOfShards = 4
  val Tag: AggregateEventShards[UserEvent] = AggregateEventTag.sharded[UserEvent](numberOfShards)
}

Now, let’s discuss about the two methods that we need to implement to create ReadSideProcessor.

1. aggregateTags

This method returns a list of all the tags that our processor will handle. To do this, we will simply return the list of all the events for our class.

override def aggregateTags: Set[AggregateEventTag[UserEvent]] =
  UserEvent.Tag.allTags

2. buildHandler

This is used to handle events by creating ReadSideHandler. We have used CassandraReadSide’s builder method to create the handler, which automatically handles readside offsets.
The argument passed to this method is the ID of the event processor that Lagom will use when it persists offsets to its offset store.

readSide.builder[UserEvent]("userEventOffset")

We also need to register two callbacks – globalPrepare and prepare.

  • Global Prepare

This callback isused for creating tables and preparing any data that needs to be available before read side processing starts. This runs at least once across the whole cluster.
We can create tables like:

def createTable(): Future[Done] = {
  session.executeCreateTable(
    """
      |CREATE TABLE IF NOT EXISTS usertable(
      |id text PRIMARY KEY,
      |name text,
      |age int
      |);
    """.stripMargin)
}

After that we can register the globalPrepare callback in buildHandler method using setGlobalPrepare.

readSide.setGlobalPrepare(createTable)
  • prepare

This callaback is executed once per shard, when the read side processor starts up. It can be used for preparing statements in order to optimize Cassandra’s handling of them.
Example of creating prepared statements for insert queries looks like this:

def createPreparedStatements: Future[Done] = {
  for{
    userPreparedStatement <- session.prepare("INSERT INTO usertable(id, name, age) VALUES (?, ?, ?)")
  } yield{
    userStatement = userPreparedStatement
    Done
  }
}

This callback also needs to be registered inside buildHandler method using setPrepare.

readSide.setPrepare(_ => createPreparedStatements)
  • Event Handler

The event handlers are responsible for handling the actual events. They take an event, and return a list of bound statements. Example of handling User Event looks like this:

def storeUser(user: User): Future[List[BoundStatement]] = {
  val userBindStatement = userStatement.bind()
  userBindStatement.setString("id", user.id)
  userBindStatement.setString("name", user.name)
  userBindStatement.setInt("age", user.age)
  Future.successful(List(userBindStatement))
}

This is then registered with the builder using setEventHandler.

readSide.setEventHandler[UserCreated](e ⇒ storeUser(e.event.user))

Registering your read-side processor

You need to register Read-side processor with your microservice. This is done inside your application service loader.

readSide.register(wire[UserProcessor])

To query the cassandra tables, you can create a method and use cassandra select statements. An example for getting user details for our application is :

def getUserByName(name: String): Future[Option[User]] =
  session.selectOne(s"SELECT * FROM usertable WHERE name = '$name'").map{optRow =>
    optRow.map{row =>
      val id = row.getString("id")
      val name = row.getString("name")
      val age = row.getInt("age")
      User(id, name, age)
   }
  }

The complete demo code is available here. You can check README.md file for instructions to run the application.

References:

Happy Blogging.!


knoldus-advt-sticker


 

Written by 

I am a Software Consultant, having experience of more than 1 year. I am well versed with Object Oriented Programming Paradigms having good command of programming languages like Scala, Java & C++ and also skilled in building the microservices architecture based application using Lagom, Cassandra, Elasticsearch and many more. My hobbies include reading novels, writing blogs, drawing, listening to music.

4 thoughts on “Persistent Read Side in Lagom

Leave a Reply

%d bloggers like this: