Reactive Programming with Powerful Spring Reactor

Reading Time: 4 minutes

1. Introduction

In this Spring Reactor lesson, we will create a Spring Boot project and start producing and consuming messages in the same application itself. we will see how Reactive Streams Specification .we’ll try to build a simple reactive application and compare it

2. Reactor in the JVM

The Reactor, as stated by Spring , it is a foundational framework for asynchronous applications on the JVM which on modest hardware

3.Making the Spring Boot Project with Maven

mavenarchetypequickstart is an archetype which generates a sample Maven project:

Creating a Project

mvn archetype:generate -DgroupId=com.test.example -DartifactId=JCG-BootReactor-Example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

4. Adding Maven dependencies

Next step is to add appropriate Maven Dependencies to the project. We will add these dependencies in our project:

  • spring-boot-starter-web: This dependency marks this project as a Web project and it adds dependencies
  • reactor-bus: This is the dependency which brings all the Reactor related dependencies into the project class path
  • spring-boot-starter-test: This dependency collects all test related Jar’s into the project like JUnit and Mockito

5. Project skeleton

Here we create multiple packages followed and code remains modular which makes the extension of the project .

6. Understanding the sample application

This application receives updates from an external provider about the location of a shipment being delivered to the customer at a given address. Once the application receives this update,than we perform various operations like:

  • Update location for shipment in the database
  • Sending notification to user’s mobile device
  • Send an email notification
  • Send an SMS to user

We choose to exhibit reactive behavior for these operations as user doesn’t depend on these operations to happen in exact real-time as they are mostly background tasks which can take a little long time as well and user won’t be highly affected if the status update for a shipment is few minutes late. Let’s get started with the model creation first.

7. Defining the POJO Model

We will start by defining our POJO which represents a shipment being sent to a customer which have fields like shipment Id, current Location etc. Let’s define this POJO here:

Shipment.java

package com.test.example.model; 
public class Shipment {
    private String shipmentId;
    private String name;
    private String currentLocation;
    private String deliveryAddress;
    private String status; 
    //standard setters and getters
}

We have defined some basic fields here and add getters and setters

8. Defining the Service

ShipmentService interface which defines the contract for the functionality

ShipmentService.java

package com.test.example.service; 
import com.test.example.model.Shipment; 
public interface ShipmentService { 
             void shipmentLocationUpdate(Shipment shipment);
}

We only have one method definition in this interface . Let’s now move on to implement this service where we will actually demonstrating a sleep method .

ShipmentServiceImpl.java

package com.test.example.service;
import com.test.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class ShipmentServiceImpl implements ShipmentService {
private final Logger LOG = LoggerFactory.getLogger("ShipmentService"); @Override 
public void shipmentLocationUpdate(Shipment shipment) throws InterruptedException {
 LOG.info("Shipment data: {}", shipment.getShipmentId());
 Thread.sleep(3000);
 LOG.info("Shipment with ID: {} reached at knoldus!!!", shipment.getShipmentId());
 }
}

9. Defining the Event Consumer

here we declare a Event Handler which listen for events

EventHandler.java

package com.test.example.handler;
import com.test.example.model.Shipment;
import com.test.example.service.ShipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.bus.Event;
import reactor.fn.Consumer;
@Service
public class EventHandler implements Consumer> {
private final ShipmentService shipmentService;
 @Autowired 
public EventHandler(ShipmentService shipmentService)
 {
 this.shipmentService = shipmentService;
 } 
@Override
 public void accept(Event<Shipment> shipmentEvent) {
 Shipment shipment = shipmentEvent.getData();
 try {
 shipmentService.shipmentLocationUpdate(shipment);
 } catch
 (InterruptedException e) 
{ 
//do something as bad things have happened 
} 
}
}

The good thing about this consumer class is that it receives a Shipment object itself from the event bus.

10. Defining the Java Configuration

We can define the configurations with Java in our application. Let’s do those definitions here:

ReactorConfig.java

package com.test.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.Environment;
import reactor.bus.EventBus;
@Configuration
public class ReactorConfig {
@Bean Environment env() {
 return Environment.initializeIfEmpty().assignErrorJournal();
 } 
@Bean 
EventBus createEventBus(Environment env)
 {
 return EventBus.create(env, Environment.THREAD_POOL); 
}
}

Clearly, there is nothing special here. We just initialized our thread pool with some number (default here).

11. Defining the Spring-Boot

here we use main class to execute my application

Application.java

package com.test.example;
import com.test.example.handler.EventHandler;
import com.test.example.model.Shipment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.bus.Event;
import reactor.bus.EventBus;
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class Application implements CommandLineRunner {
private final Logger LOG = LoggerFactory.getLogger("Application"); 
private final EventBus eventBus; 
private final EventHandler eventHandler;
 @Autowired public Application(EventBus eventBus, EventHandler eventHandler)
 {
 this.eventBus = eventBus;
 this.eventHandler = eventHandler;
 } 
public static void main(String[] args) { SpringApplication.run(Application.class, args);
 } 
@Override 
public void run(String... strings) throws Exception { eventBus.on($("eventHandler"), eventHandler);
 //Publish messages here for (int i = 0; i < 10; i++) 
{ 
Shipment shipment = new Shipment(); shipment.setShipmentId(String.valueOf(i));
 eventBus.notify("eventHandler", Event.wrap(shipment));
 LOG.info("Published shipment number {}.", i); 
} 
}
}

we publish a message to the specified topic and We have used an interface Command Line Runner to make this class run code with which we can test the producer and config class code we wrote. In this class, we publish a message to the specified topic and listen for it in the consumer class .

In the next section, we will run our project with a simple Maven command.

12. Running the project

Run the application using maven

mvn spring-boot:run

13. Conclusion

we explain a simple way of defining the thread pool executor which defined four threads and a consumer which make use of this thread pool to manage events in parallel. One of the most common problems faced in applications which rely on asynchronous behavior for execution of operations is that run out of memory very soon as there are multiple threads which start to occupy heap space and create objects as they start to process. It is important to make sure that while starting the application, we assign good heap size to the application which directly depends on the size of the thread pool defined for the application.

Reference

https://stackabuse.com/spring-reactor-tutorial/

knoldus

Written by 

I have work as a java developer and have some knowledge of front -end also and now i am learning a new technologies.