Axon Framework: Event Sourcing with MongoDB

person holding pen pointing at graph
Reading Time: 4 minutes

If you want to build Microservices, You may prefer Axon, a Java-based framework. It provides utility to implement CQRS (Command Query Responsibility Segregation), Event Sourcing, and DDD (Domain Driven Design) architectural patterns while developing a software application. Axon facilitates the implementation of Command Handling, Event Routing, Event Sourcing, Snapshotting, and many more building blocks. It has very friendly APIs to use these implementations and build efficient event-driven applications with event sourcing and CQRS. Axon helps us to reduce lots of boilerplate infrastructure and Coding needed to build an application. In this blog, you are going to learn how to build an application with the Axon framework using event sourcing. For event store, will integrate Mongo Database.

What is an Event sourcing?

Event sourcing is a design pattern that captures an application’s state changes as a sequence of events. This means, handling operations on data that’s driven by a sequence of events, each of which is persist in an append-only event store. Terminologies used in event sourcing context:

Events: They are objects that describe something that has occurred in the application. For example, In a shopping cart microservice, the ItemAddedToCart event occurs and needs to handle that event.

Event Bus: It is the mechanism that dispatches events to the subscribed event handlers.

Event Store: An event store is an event bus that can persist published events and can retrieve previous events based on a given aggregate identifier.

Follow the link to know the benifits of event sourcing

Implement Event sourcing Application with Axon

Axon and Spring framework are nicely integrated. So, it become very easy to build event sourcing applications with axon and spring framework.

Dependency

<dependency>

   <groupId>org.axonframework.extensions.mongo</groupId>

   <artifactId>axon-mongo</artifactId>

   <version>4.5</version>

</dependency>

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-data-mongodb</artifactId>

</dependency>

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-data-rest</artifactId>

</dependency>

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-test</artifactId>

   <scope>test</scope>

</dependency>

<dependency>

   <groupId>org.springframework.kafka</groupId>

   <artifactId>spring-kafka</artifactId>

</dependency>

<dependency>

   <groupId>org.axonframework</groupId>

   <artifactId>axon-spring-boot-starter</artifactId>

   <version>4.5.10</version>

   <exclusions>

      <exclusion>

         <groupId>org.axonframework</groupId>

         <artifactId>axon-server-connector</artifactId>

      </exclusion>

   </exclusions>

</dependency>

In the above, axon-server is excluded from axon dependency because we are going to implement MongoDB as an event store. Go through this video to understand your event storage requirement.

Configure MongoDB as an event store

Use MongoEventStorageEngine to define MongoDB as EventStorageEngine for an application.

@Bean

public EmbeddedEventStore eventStore(EventStorageEngine storageEngine,

 AxonConfiguration configuration) {

    return EmbeddedEventStore.builder()

            .storageEngine(storageEngine)

            .messageMonitor(configuration.messageMonitor(EventStore.class,

 "paymenteventStore"))

            .build();

}

@Bean

public EventStorageEngine storageEngine(MongoClient client) {

    return MongoEventStorageEngine.builder()
            .mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build())

            .eventSerializer(JacksonSerializer.defaultSerializer())

            .snapshotSerializer(JacksonSerializer.defaultSerializer())

            .build();
}

Application properties

spring.data.mongodb.host=localhost

spring.data.mongodb.port=27017

spring.data.mongodb.username=root

spring.data.mongodb.password=rootpassword

spring.data.mongodb.authentication-database = admin

axon.serializer.general=Jackson

axon.serializer.events=jackson

axon.serializer.messages=jackson

Implement an Aggregate

Aggregate is a building block to building an event source or CQRS application with Axon. In Axon, an Aggregate is a regular Java object, which contains the application state and methods to alter that state. An aggregate is capable of handling commands and events that occured in the scope of that aggregate.

1.@Aggregate

public class PaymentAggregate {

    private final static Logger LOGGER = 

LoggerFactory.getLogger(PaymentAggregate.class);

2.  @AggregateIdentifier

    private String paymentId;

    private PaymentState paymentState;

    PaymentAggregate() {}

3.  @CommandHandler

    public PaymentAggregate(CheckInCommand checkInCommand) {

        LOGGER.info("check in received for Id - {}.",

 checkInCommand.getCharge().getId());

4.        AggregateLifecycle.apply(new PreAuthorize(checkInCommand.getId(),

               checkInCommand.getCharge()));
    }


5.  @EventSourcingHandler

    public void on(PreAuthorize preAuthorize) {

        Charge charge = preAuthorize.getCharge();

        LOGGER.info("Pre Authorization event for id - {}.", charge.getId());

        this.paymentId = preAuthorize.getCharge().getId();

        this.paymentState = PaymentState.NEW;

        if (charge.getAvailableAmount()

                .compareTo(charge.getHoldAmount())>0) {

            LOGGER.info("Sufficient Amount available for hotel services for 

id {}",
                    charge.getId());

            AggregateLifecycle.apply(new

 PreAuthorizationApprove(charge.getId(),
                    charge));

        } else {

            LOGGER.info("Sufficient amount not available for hotel services 

for id {}",
                    charge.getId());

            AggregateLifecycle.apply(new

 PreAuthorizationDeclined(charge.getId(),

                    charge));
        }

    }
In the above aggregate class:

1. By applying an annotation ‘@Aggregate‘, Axon will treat the class PaymentAggregate as an aggregate. However, by default, Axon will configure this Aggregate as an ‘Event Sourced‘ Aggregate. So, every event occurs that targets this aggregate, will persist to the configured event store and publish the event for the subscribing event handler.

2. Field paymentId should be unique for each PaymentAggregate instance. Annotation @AggregateIdentifier will tell the axon framework, to which an aggregate a particular command or event belongs to. This is a hardcode requirement.

3. This @CommandHandle annotation tells the axon that the given PaymentAggregate constructor is capable of handling the CheckInCommand. However, this is the place where you can put your decision-making, validation, business logic, etc. To send a command to the respective command handler use Axon Command gateway.

private final CommandGateway commandGateway;

commandGateway.send(new CheckInCommand(UUID.randomUUID().toString(), charge))

4. With the static AggregateLifecycle.apply(new PreAuthorize(checkInCommand.getId(), checkInCommand.getCharge())); method, the provided PreAuthorize event will be published as EventMessages within the scope of the Aggregate they are applied in.

5. @EventSourcingHandler annotation tells the Axon, the annotated function should be called when the Aggregate is ‘sourced from its events’. Means, The methods annotated with @EventSourcingHandler are triggered whenever the event associated with them occurs. So, here method annotated by EventSourcingHandler will be called for PreAuthorize event and the state of the PaymentAggregate domain will be updated. All the state changes happen in EventSourcingHandler method. Then the event is persisted in the event store, the event bus will invoke all the other external event handlers. Like, if you want to handle the event on query side to store some data.

@EventHandler

public void on(PreAuthorize preAuthorize) {

    Charge charge = preAuthorize.getCharge();

    LOGGER.info("Handling preAuthorize event for id {}",

           charge.getId());

    payment.setId(charge.getId());

    payment.setPaymentState(PaymentState.NEW);

    payment.setAmount(charge.getAvailableAmount());

    paymentRepository.save(payment);
}

Structure of a domain events in Event store

The `MongoEventStorageEngine` stores each event in a separate MongoDB document.

{ "_id" : ObjectId("62cd556760647a242065ef75"), "aggregateIdentifier" :

 "113", "type" : "PaymentAggregate", "sequenceNumber" : NumberLong(0),

 "serializedPayload" : "{\"id\":\"4b20db2c-5b55-4278-b3cf-

959acf34c0b2\",\"charge\":

{\"id\":\"113\",\"holdAmount\":365,\"availableAmount\":500,\"flow\":\"check-

in\",\"date\":\"2022-05-5\",\"hotel\":\"The Lalit, New 

Delhi\",\"authorize\":false}}", "timestamp" : "2022-07-12T11:05:11.091Z", 

"payloadType" : "org.knoldus.axon.state.machine.event.PreAuthorize",

 "payloadRevision" : null, "serializedMetaData" : "{\"traceId\":\"f5269a43-

16b2-4732-82d0-4bba90433b9a\",\"correlationId\":\"f5269a43-16b2-4732-82d0-

4bba90433b9a\"}", "eventIdentifier" : "fbada547-c77f-4316-98bd-972dd1d24c7c" 

}


{ "_id" : ObjectId("62cd556760647a242065ef76"), "aggregateIdentifier" : 

"113", "type" : "PaymentAggregate", "sequenceNumber" : NumberLong(1), 

"serializedPayload" : "{\"id\":\"113\",\"charge\":

{\"id\":\"113\",\"holdAmount\":365,\"availableAmount\":500,\"flow\":\"check-

in\",\"date\":\"2022-05-5\",\"hotel\":\"The Lalit, New 

Delhi\",\"authorize\":false}}", "timestamp" : "2022-07-12T11:05:11.095Z", 

"payloadType" : 

"org.knoldus.axon.state.machine.event.PreAuthorizationApprove", 

"payloadRevision" : null, "serializedMetaData" : "{\"traceId\":\"f5269a43-

16b2-4732-82d0-4bba90433b9a\",\"correlationId\":\"f5269a43-16b2-4732-82d0-

4bba90433b9a\"}", "eventIdentifier" : "6107e199-589c-4843-b08c-7330474d4a56" 

}

You can find comlete code here !!

Written by 

Exploring Big Data Technologies.