A CRUD application: Lagom ES & CQRS with MySQL

Lagom
Reading Time: 2 minutes

In this blog, we will look at how we have implemented a CRUD application using Relational Database rather than using a NoSQL database with Lagom.

Lagom is a framework built on top of Akka and Play, with the intent of allowing users to easily and reliably build microservice-based applications. The framework and the concepts behind it are heavily based on CQRS (Command Query Responsibility Segregation) and ES (Event Sourcing). This dictates how the state is handled and persisted internally. By default, Lagom uses the Cassandra key-value store for persistence. But in my scenario NoSQL database is too big to handle as my records are very less so using Cassandra would be an overhead for me as proper data modeling and updating a field is cumbersome. So I took MySQL as my database instead of Cassandra. Now let’s get started:

Dependency



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.15"
view raw

Dependency

hosted with ❤ by GitHub

Configuration

Firstly I provided MySQL configurations in the application.conf file for my database:



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


play.application.loader = com.knoldus.user.impl.UserLoader
jdbc-defaults.slick {
profile = "slick.jdbc.MySQLProfile$"
jndiName = DefaultDS
}
db.default {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/db?autoReconnect=true"
username = ${?USERNAME}
password = ${?PASSWORD}
}

Then, after providing configurations I created a database itself as in MySQL we have to create a database (a keyspace in Cassandra which got generated itself) db as shown in the configuration above for:

db.default.url =”jdbc:mysql://localhost:3306/db?autoReconnect=true”

Event processor

An event processor we usually create for Cassandra didn’t work for me. So I changed processor a bit for MySQL to have it workable by overriding methods explicitly:



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


override def buildHandler(): ReadSideProcessor.ReadSideHandler[Event] = {
new ReadSideHandler[Event] {
override def globalPrepare(): Future[Done] = createTable
override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] = {
Flow[EventStreamElement[Event]].mapAsync(4) { eventElement =>
eventElement.event match {
case addEvent: AddEvent => processAdded(addEvent)
case updateEvent: UpdateEvent => processUpdated(updateEvent)
case deleteEvent: DeleteEvent => processDeleted(deleteEvent)
}
}
}
}
}

where:
createTable denotes the method which creates table let’s say, User table.
processAdded denotes the method which executes insert query.
processUpdated denotes the method which updates the user table.
processDeleted denotes the method which deletes a record from the user table.

Repository

Remember to inject (session: JdbcSession)  into your class to get the connection to execute the queries.



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


def createTable: Future[Done] = { val query =
"""
|CREATE TABLE IF NOT EXISTS user (
|orgId INT NOT NULL,
|email VARCHAR(64) NOT NULL,
|name VARCHAR(64) NOT NULL,
|PRIMARY KEY (orgId))
""".stripMargin
session.withConnection(_.prepareStatement(query).execute()).map(_ => Done)

 In the same way, we can insert into MySQL using the statement below:



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


def processUserAdded(addEvent: AddEvent): Future[Done] = {
session.withConnection { connection =>
val statement = connection.prepareStatement("INSERT INTO user (orgId, email, name) VALUES (?, ?, ?)")
statement.setInt(1, addEvent.user.orgId)
statement.setString(2, addEvent.user.email)
statement.setString(3, addEvent.user.name)
statement.execute()
}.map(_ => Done)
}

processUpdated(updateEvent) and processDeleted(deleteEvent) can be created in the same way.

Conclusion

Rest State, Command, Event, Entity would be the same as we do with Cassandra. I had to deal with fewer data so, I took MySQL. We can use Postgres as well, it depends on the use case if you have large data and fewer updates go with Cassandra with proper data modeling. You can refer lagom-scala-mysql-es repository for the code here on Github.

That’s it, pretty easy. We have implemented Lagom Event Sourcing with MySQL successfully. I hope it helped. For any queries or doubts please comment below, I would like to answer.

Till then Stay tuned!! 🙂


Knoldus-blog-footer-image

Written by 

Charmy is a Software Consultant having experience of more than 1.5 years. She is familiar with Object Oriented Programming Paradigms and has familiarity with Technical languages such as Scala, Lagom, Java, Apache Solr, Apache Spark, Apache Kafka, Apigee. She is always eager to learn new concepts in order to expand her horizon. Her hobbies include playing guitar and Sketching.