
1. Introduction
In this Spring Reactor lesson, we will create a Spring Boot project and start producing and consuming messages in the same application itself. we will see how Reactive Streams Specification .we’ll try to build a simple reactive application and compare it
2. Reactor in the JVM
The Reactor, as stated by Spring , it is a foundational framework for asynchronous applications on the JVM which on modest hardware
3.Making the Spring Boot Project with Maven
maven–archetype–quickstart is an archetype which generates a sample Maven project:
Creating a Project
mvn archetype:generate -DgroupId=com.test.example -DartifactId=JCG-BootReactor-Example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false |
4. Adding Maven dependencies
Next step is to add appropriate Maven Dependencies to the project. We will add these dependencies in our project:
spring-boot-starter-web
: This dependency marks this project as a Web project and it adds dependenciesreactor-bus
: This is the dependency which brings all the Reactor related dependencies into the project class pathspring-boot-starter-test
: This dependency collects all test related Jar’s into the project like JUnit and Mockito
5. Project skeleton
Here we create multiple packages followed and code remains modular which makes the extension of the project .
6. Understanding the sample application
This application receives updates from an external provider about the location of a shipment being delivered to the customer at a given address. Once the application receives this update,than we perform various operations like:
- Update location for shipment in the database
- Sending notification to user’s mobile device
- Send an email notification
- Send an SMS to user
We choose to exhibit reactive behavior for these operations as user doesn’t depend on these operations to happen in exact real-time as they are mostly background tasks which can take a little long time as well and user won’t be highly affected if the status update for a shipment is few minutes late. Let’s get started with the model creation first.
7. Defining the POJO Model
We will start by defining our POJO which represents a shipment being sent to a customer which have fields like shipment Id, current Location
etc. Let’s define this POJO here:
Shipment.java
package com.test.example.model;
public class Shipment {
private String shipmentId;
private String name;
private String currentLocation;
private String deliveryAddress;
private String status;
//standard setters and getters
}
We have defined some basic fields here and add getters and setters
8. Defining the Service
ShipmentService interface which defines the contract for the functionality
ShipmentService.java
package com.test.example.service;
import com.test.example.model.Shipment;
public interface ShipmentService {
void shipmentLocationUpdate(Shipment shipment);
}
We only have one method definition in this interface . Let’s now move on to implement this service where we will actually demonstrating a sleep method .
ShipmentServiceImpl.java
package com.test.example.service;
import com.test.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class ShipmentServiceImpl implements ShipmentService {
private final Logger LOG = LoggerFactory.getLogger("ShipmentService"); @Override
public void shipmentLocationUpdate(Shipment shipment) throws InterruptedException {
LOG.info("Shipment data: {}", shipment.getShipmentId());
Thread.sleep(3000);
LOG.info("Shipment with ID: {} reached at knoldus!!!", shipment.getShipmentId());
}
}
9. Defining the Event Consumer
here we declare a Event Handler which listen for events
EventHandler.java
package com.test.example.handler;
import com.test.example.model.Shipment;
import com.test.example.service.ShipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.bus.Event;
import reactor.fn.Consumer;
@Service
public class EventHandler implements Consumer> {
private final ShipmentService shipmentService;
@Autowired
public EventHandler(ShipmentService shipmentService)
{
this.shipmentService = shipmentService;
}
@Override
public void accept(Event<Shipment> shipmentEvent) {
Shipment shipment = shipmentEvent.getData();
try {
shipmentService.shipmentLocationUpdate(shipment);
} catch
(InterruptedException e)
{
//do something as bad things have happened
}
}
}
The good thing about this consumer class is that it receives a Shipment
object itself from the event bus.
10. Defining the Java Configuration
We can define the configurations with Java in our application. Let’s do those definitions here:
ReactorConfig.java
package com.test.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.Environment;
import reactor.bus.EventBus;
@Configuration
public class ReactorConfig {
@Bean Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
EventBus createEventBus(Environment env)
{
return EventBus.create(env, Environment.THREAD_POOL);
}
}
Clearly, there is nothing special here. We just initialized our thread pool with some number (default here).
11. Defining the Spring-Boot
here we use main class to execute my application
Application.java
package com.test.example;
import com.test.example.handler.EventHandler;
import com.test.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.bus.Event;
import reactor.bus.EventBus;
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class Application implements CommandLineRunner {
private final Logger LOG = LoggerFactory.getLogger("Application");
private final EventBus eventBus;
private final EventHandler eventHandler;
@Autowired public Application(EventBus eventBus, EventHandler eventHandler)
{
this.eventBus = eventBus;
this.eventHandler = eventHandler;
}
public static void main(String[] args) { SpringApplication.run(Application.class, args);
}
@Override
public void run(String... strings) throws Exception { eventBus.on($("eventHandler"), eventHandler);
//Publish messages here for (int i = 0; i < 10; i++)
{
Shipment shipment = new Shipment(); shipment.setShipmentId(String.valueOf(i));
eventBus.notify("eventHandler", Event.wrap(shipment));
LOG.info("Published shipment number {}.", i);
}
}
}
we publish a message to the specified topic and We have used an interface Command Line Runner to make this class run code with which we can test the producer and config class code we wrote. In this class, we publish a message to the specified topic and listen for it in the consumer class .
In the next section, we will run our project with a simple Maven command.
12. Running the project
Run the application using maven
mvn spring-boot:run
13. Conclusion
we explain a simple way of defining the thread pool executor which defined four threads and a consumer which make use of this thread pool to manage events in parallel. One of the most common problems faced in applications which rely on asynchronous behavior for execution of operations is that run out of memory very soon as there are multiple threads which start to occupy heap space and create objects as they start to process. It is important to make sure that while starting the application, we assign good heap size to the application which directly depends on the size of the thread pool defined for the application.
Reference
https://stackabuse.com/spring-reactor-tutorial/