As we all know,
Event Sourcing is a practical approach to design a service that reliably/atomically updates the database and transmits messages/events. But before I tell you how to do it with KALIX, let me give you a little overview of KALIX.
Kalix Introduction
Kalix is a considerable improvement over the existing serverless model in terms of abstraction of complexity. As it provides a unifying application layer that gathers all of the necessary pieces—including databases, message brokers, caches, services meshes, API gateways, blob storages, CDN networks, CI/CD products, and so on—and exposes them in a single unified programming model and DX, tailored for the cloud and edge. This comprises a programming model with well-defined holistic semantics that guarantees end-to-end guarantees and SLAs. Kalix and it’s Serverless DX enable developers to focus on the essence of value creation: creating direct end-user and business value, resulting in a cohesive, intelligible, predictable, and maintainable system maintained from the cloud.
Or, more simply, Kalix is a fully managed PaaS that enables the development of event-driven, data-centric, real-time, cloud-native apps. From start to finish, it’s a complete developer and zero-ops experience.

In the preceding section, I gave you a quick overview of Kalix and highlighted that it is a new way to simplify cloud-based app development.
Now, let’s start with the main part “Event Sourcing with KALIX”
Steps to achieve Event Sourcing with KALIX
In Kalix we can easily achieve the Event Sourcing model by defining/declaring the Event Sourced Entities.
Basically, event-sourced entities will scale horizontally, preserve their state using ACID semantics, and isolate failures. They employ the Event Sourcing Model, which means that instead of maintaining the present state, they persist in all of the events that lead to the current state. Kalix records these occurrences in a journal.
To implement an Event Sourced Entity, the following actions must be taken:
- Defining the API and domain objects in
.proto
files. - Adding behavior to command and event handlers.
- Creating and initializing the Entity.
This page’s sections lead you through these procedures using a bank-operations service as an example.
Defining proto files:-
- bank_operations_domain.proto
- bank_operations_api.proto
src/main/proto/bank/operations/domain/bank_operations_domain.proto
syntax = "proto3";
package bank.operations.domain;
option java_outer_classname = "BankOperationsDomain";
message AccountState{
string acc_no = 1;
double totalAmount = 2;
AccountDetails accountDetails = 3;
repeated Transaction transactions = 4;
}
message Transaction{
string id = 2;
string recipientName = 3;
OperationType operation = 4;
double amount = 5;
int64 created_dtm = 6;
double totalAmount = 7;
enum OperationType{
CREDITED = 0;
DEBITED = 1;
JOINING_BONUS = 2;
}
}
message AccountDetails{
string uid = 1;
string name = 2;
string address = 3;
string city = 4;
string state = 5;
int64 created_dtm = 6;
}
message AccountCreated{
string accNo = 1;
string uid = 2;
string name = 3;
string address = 4;
string city = 5;
string state = 6;
int64 created_dtm = 7;
}
message AccountCredited{
string transactionId = 1;
string recipientName = 2;
double amount = 3;
int64 createdDtm = 4;
}
message AccountDebited{
string transactionId = 1;
string recipientName = 2;
double amount = 3;
int64 createdDtm = 4;
}
In the domain proto, we have to define the objects of state and events.
src/main/proto/bank/operations/api/bank_operations_api.proto
syntax = "proto3";
package bank.operations.api;
option java_outer_classname = "BankOperationsApi";
import "kalix/annotations.proto";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
message Account{
string accNo = 1 [(kalix.field).entity_key = true];
string uid = 2;
string name = 3;
string address = 4;
string city = 5;
string state = 6;
int64 creationDate = 7;
}
message AccountInfo{
string acc_no = 1;
double totalAmount = 2;
AccountDetails accountDetails = 3;
repeated Transaction transactions = 4;
}
message AccountDetails{
string uid = 1;
string name = 2;
string address = 3;
string city = 4;
string state = 5;
int64 created_dtm = 6;
}
message Transactions{
repeated Transaction transactions = 1;
}
message Transaction{
string id = 1;
string recipientName = 2;
OperationType operation = 3;
double amount = 4;
int64 created_dtm = 5;
double totalAmount = 6;
}
enum OperationType{
CREDITED = 0;
DEBITED = 1;
JOINING_BONUS = 2;
}
message AccountCreationResponse{
enum IsAccountCreated{
CREATION_SUCCEED = 0;
CREATION_FAILED = 1;
ACCOUNT_EXIST = 2;
}
IsAccountCreated isAccountCreated = 2;
string reply = 3;
}
message AccountCreditRequest{
string accNo = 1 [(kalix.field).entity_key = true];
string recipientName = 2;
double amount = 3;
}
message AccountDebitRequest{
string accNo = 1 [(kalix.field).entity_key = true];
string recipientName = 2;
double amount = 3;
}
message AccountInformationRequest{
string accNo = 1 [(kalix.field).entity_key = true];
}
message AccountTransactionStatus{
string accNo = 1;
OperationType operationType = 2;
double amount = 3;
}
service BankOperations {
option (kalix.codegen) = {
event_sourced_entity: {
name: "bank.operations.domain.AccountEntity"
entity_type: "eventsourced-bank-operations"
state: "bank.operations.domain.AccountState"
events: [
"bank.operations.domain.AccountCreated",
"bank.operations.domain.AccountCredited",
"bank.operations.domain.AccountDebited"
]
}
};
option (kalix.service).acl.allow = { principal: ALL };
rpc createAccount(Account) returns (AccountCreationResponse) {}
rpc creditAccount(AccountCreditRequest) returns (AccountTransactionStatus) {
option (google.api.http) = {
post: "/account/credit"
body: "*"
};
}
rpc debitAccount(AccountDebitRequest) returns (AccountTransactionStatus) {
option (google.api.http) = {
post: "/account/debit"
body: "*"
};
}
rpc getAccountInformation(AccountInformationRequest) returns (AccountInfo) {
option (google.api.http) = {
get: "/account/{accNo}"
additional_bindings: {
get: "/account/{accNo}/statement"
response_body: "transactions"
}
additional_bindings:{
get: "/account/{accNo}/balance"
response_body: "totalAmount"
}
additional_bindings:{
get: "/account/{accNo}/details"
response_body: "accountDetails"
}
};
}
}
Let’s talk about the above API proto. As we know Event Sourcing is a method for storing an immutable sequence of events in order to maintain the application state. As a result of these events, state updates are triggered, causing the application’s state to be updated.
So, for the ES we describe the Commands, Events, and State. As we already described the State and Events in the domain proto, now we have to define the commands. So, all the above RPC calls are commands here, that have their request and response as messages.
Now, as we already discussed the event-sourced entities, at last, we just have to define the events and state. So, in the API proto file, we have to declare the service and its RPC calls and add the kalix’s event_sourced_entity component in it.
Adding Behaviour:-
Now, as per the above proto files, the Kalix will generate a AccountEntity
class for us to add the business logic because the file is not overwritten once it is created, we can easily add logic to it. This AccountEntity
class extends a generated abstract class AbstractAccountEntity
which we are not supposed to edit as it gets regenerated in case we update the protobuf descriptors.
class AccountEntity(context: EventSourcedEntityContext) extends AbstractAccountEntity {
override def emptyState: AccountState =
AccountState.defaultInstance
}
At first, we have to declare an empty state, and then we need to implement all methods our Event Sourced Entity offers as command handlers.
For example, there is an abstract method `createAccount` for account creation, that will handle the command for an account creation request. Now. we have to add the behavior/logic for the createAccount command as shown below.
override def createAccount(currentState: AccountState, account: Account):
EventSourcedEntity.Effect[AccountCreationResponse] = {
if(currentState.accNo.nonEmpty){
val response = AccountCreationResponse(
isAccountCreated = ACCOUNT_EXIST,
reply = "This Account is already exist, please generate new one!"
)
effects.reply(response)
}else{
val event = AccountCreated(
account.accNo,
account.uid,
account.name,
account.address,
account.city,
account.state,
account.creationDate
)
val response = AccountCreationResponse(
isAccountCreated = CREATION_SUCCEED,
reply = "Congratulations! Your account is created successfully and joining bonus is
added by Bank."
)
effects.emitEvent(event)
.thenReply(_ => response)
}
}
In the above example, the createAccount
method emits an event to update the current state, Now, we need to implement the Event handler for the particular incoming event so that the event will be stored successfully.
For example, we have a method accountCreated
as an event handler for the AccountCreared
event.
override def accountCreated(currentState: AccountState, accountCreated: AccountCreated):
AccountState ={
AccountState(
accountCreated.accNo,
100.0,
Some(domain.AccountDetails(
uid = accountCreated.uid,
name = accountCreated.name,
address = accountCreated.address,
city = accountCreated.city,
state = accountCreated.state,
createdDtm = accountCreated.createdDtm
)),
Seq(
domain.Transaction(
id = UUID.randomUUID().toString,
recipientName = "BANK",
operation = OperationType.JOINING_BONUS,
amount = 100.0,
createdDtm = accountCreated.createdDtm,
totalAmount = 100.0
)
)
)
}
So, till now we created the event-sourced entities and added their behaviors. now, the final and last step is to initialize the entity.
Registering the Entity
We must register the Event Sourced Entity with the service for Kalix to recognize it.
The registration is automatically inserted in the produced KalixFactory.withComponents method from the Main class during code generation.
object Main {
private val log = LoggerFactory.getLogger("bank.operations.Main")
def createKalix(): Kalix = {
// The KalixFactory automatically registers any generated Actions, Views or Entities,
// and is kept up-to-date with any changes in your protobuf definitions.
// If you prefer, you may remove this and manually register these components in a
// `Kalix()` instance.
KalixFactory.withComponents(
new AccountEntity(_)
)
}
def main(args: Array[String]): Unit = {
log.info("starting the Kalix service")
createKalix().start()
}
}
All the steps are completed.
For more information regarding Kalix, you can visit its official documentation.
Note: You can find the whole implementation of the above example here.