Lagom Persistence API with Couchbase

In this blog, I will talk about using Couchbase with Lagom’s Persistent Entity API. And then we will see how we can query for fetching the data.

We already know that Lagom handles data persistence with ‘Persistent Entity’ which holds the state of individual entities. And to interact with them one must know the identifier of the entity. So Lagom provides the support to build a read-side view of the persistent data. And this process of separation of the write and read of the persistent data is often referred to as CQRS.

Read side can be implemented using any database, but in this blog, I will be showcasing its use with Couchbase. Lagom’s Couchbase support is provided by akka-persistence-couchbase plugin

Configure Couchbase with Lagom

To use Couchbase with Lagom, we need to add the following dependency to the build file.

libraryDependencies += "com.lightbend.akka" %% "lagom-javadsl-persistence-couchbase" % "1.0-RC2"

Now to use Couchbase with Lagom, we must always ensure that each service must use a unique bucket name. This ensures that the documents of different services do not conflict with each other. We need to configure the connections and buckets that are used for these documents in each of the service implementation. As we all know there are three Lagom components that will require the Couchbase configurations. Let see each of them.

Journal and Snapshot Store

The journal is responsible for storing the serialised events. And the snapshot store is responsible for storing the snapshots of the state. These are used to support faster recovery. The following configurations need to be provided in the application.conf file.

sample-service.couchbase.bucket = "akka"

couchbase-journal {
  write.bucket = ${sample-service.couchbase.bucket}
  snapshot.bucket = ${sample-service.couchbase.bucket}

   connection {
    nodes = ["cluster"]
    username = "Knoldus"
    password = "*****"
  }
}

Offset Store

The offset store is responsible for keeping a track of the most recent event handled by each read-side processor. It is used by the read-side.

sample-service.couchbase.bucket = "akka"

lagom.persistence.read-side.couchbase {
  bucket = sample-service.couchbase.bucket

  connection {
    nodes = ["cluster"]
    username = "Knoldus"
    password = "*****"
  }
}

NOTE : While different services should be isolated by using different buckets, it is perfectly fine to use the same bucket for all of these components within one service.

Configuring Indexes

The bucket that is configured in the couchbase-journal.write.bucket, couchbase-journal.snapshot.bucket and lagom.persistence.read-side.couchbase.bucket needs to have the following indexes to allow the plugin to function.

// The journal requires the following two indexes
CREATE INDEX `persistence-ids` on `akka` (`persistence_id`) 
  WHERE `type` = "journal_message"
  
CREATE INDEX `sequence-nrs` on `akka` 
  (DISTINCT ARRAY m.sequence_nr FOR m in messages END) 
  WHERE `type` = "journal_message"

//To support query side with event-for-tags the following two indexes will be required
CREATE INDEX `tags` ON 
`akka`((ALL (ARRAY (ALL (ARRAY [`t`, (`m`.`ordering`)] FOR `t` IN (`m`.`tags`) END)) FOR `m` IN `messages` END))) 
WHERE (`type` = "journal_message")

CREATE INDEX `tag-seq-nrs` ON 
`akka`((ALL (ARRAY (ALL (ARRAY [`persistence_id`, `t`.`tag`, `t`.`seq_nr`] FOR `t` IN (`m`.`tag_seq_nrs`) END)) FOR `m` IN `messages` END))) 
WHERE (`type` = "journal_message")

//To support snapshot 
CREATE INDEX `snapshots` ON `akka` (persistence_id, sequence_nr) 
WHERE akka.type = "snapshot"

Setting up the plugin

Since Lagom by default uses Akka’s akka-persistence-cassandra, we need to specify in the `application.conf` that we will be using different plugin :

akka.persistence.journal.plugin = "couchbase-journal.write"

This configures Akka to use the couchbase journal and snapshot plugin and configures the Couchbase to be used for storing the events and snapshots.

This is all that we need in the configuration and now we are good to use Couchbase with Lagom. Let’s go ahead and start using it, start by retrieving from couchbase.

public CompletionStage<List<UserGreeting>> listUserGreetings() {
        return couchbaseSession.get(Constants.DOC_ID)
                .thenApply(docOpt -> {
                    if (docOpt.isPresent()) {
                        JsonObject content = docOpt.get().content();
                        return content.getNames().stream().map(
                                name -> new UserGreeting(name, content.getString(name))
                        ).collect(Collectors.toList());
                    } else {
                        return Collections.emptyList();
                    }
                });
    }

CouchbaseSession provides several methods in different flavours for executing queries. There are methods for streaming a result set, which can be useful when the result set is big. All methods in CouchbaseSession are non-blocking and they return a CompletionStage or a Source.

The read-side

The next part is how do we update the read-side, for that we need to transform the events generated by the Persistent Entities into a Couchbase document that can be queried. So let us implement a ReadSideProcessor with help of CouchbaseReadSide. As we know that in a ReadSideProcessor ,we need to provide an implementation for the buildHandler() and aggregateTags().

 @Override
    public ReadSideHandler<GreetingEvent> buildHandler() {
        return couchbaseReadSide.<GreetingEvent>builder(Constants.ReadSideId)
                .setGlobalPrepare(this::globalPrepare)
                .setEventHandler(GreetingEvent.GreetingMessageChanged.class, this::processGreetingMessageChanged)
                .build();
    }

The buildHandler() is responsible for creating the ReadSideHandler that will handle events. It also has two callbacks, one is a global prepare callback, the other is a regular prepare callback.

CouchbaseReadSide has a builder method for creating a builder for these handlers, this builder will create a handler that will automatically handle read-side offsets for you.

The argument passed to this method is the ID of the event processor that Lagom will use when it persists offsets to its offset store.

The globalPrepare() is for creating documents and preparing any data that needs to be available before read side processing starts.

 @Override
    public PSequence<AggregateEventTag<GreetingEvent>> aggregateTags() {
        return GreetingEvent.TAG.allTags();
    }

Now we need to register the read-side processor as we usually do with Lagom.

The last and final part remaining is the EventHandler. The event handlers take an event and execute updates. The EventHandler is registered with the builder using setEventHandler (shown is above example).

So this is all that is required to replace the default Cassandra with the Couchbase. But there are a few things to keep in mind while using Couchbase with Lagom like :

  1. Lagom will NOT start Couchbase service automatically.
  2. Couchbase connection can ONLY be configured statically. Dynamically locatable Couchbase server is not supported at the moment.

NOTE: For complete code visit : https://github.com/knoldus/lagom-es-couchbase.g8/tree/master/src/main/g8

References:

  • https://doc.akka.io/docs/akka-persistence-couchbase/current/overview.html
  • https://github.com/lagom/lagom-recipes
  • https://www.lagomframework.com/documentation/1.4.x/java/PersistentEntity.html

knoldus-advt-sticker

Written by 

Pallavi is a Software Consultant, with more than 3 years of experience. She is very dedicated, hardworking and adaptive. She is Technology agnostic and knows languages like Scala and Java. Her areas of interests include microservices, Akka, Kafka, Play, Lagom, Graphql, Couchbase etc. Her hobbies include art & craft and photography.

Knoldus Pune Careers - Hiring Freshers

Get a head start on your career at Knoldus. Join us!