Spring Reactor

Reading Time: 6 minutes
This image has an empty alt attribute; its file name is images

In this blog, we’ll get introduced to the Spring Reactor project and its importance. The idea is to take advantage of the Reactive Streams Specification to build non-blocking reactive applications on the JVM.

Using this knowledge, we’ll build a simple reactive application and compare it to a traditional blocking application.

Reactive applications are the “hot new thing” making a lot of applications switch to this model. You can read more about this in The Reactive Manifesto.

Conventional APIs are Blocking

Modern applications deal with a high number of concurrent users and data. Moore’s law is no longer holding as it used to. The hardware capabilities, although increasing, are not keeping up with modern applications where performance is very important.

Java developers by default write blocking code. It’s just how the API was set up. Another example would be the traditional servlet (Tomcat) approach. Each request warrants a new thread which waits for the whole background process to finish in order to send the response back.

This means that our data layer logic is blocking the application by default since Threads idly wait for a response. It’s wasteful to not reuse these Threads for a different purpose, while we wait for the response to get back.

Reactor motivation



Note: This might be a problem if we have limited resources or a process takes too much time to execute.

Asynchronous still Blocks

In Java, you can write code asynchronously using Callbacks and Futures. You can then get and join threads at some later point in time and process the result. Java 8 introduced us with a new class – CompletableFuture, which makes it much easier to coordinate these things.

It works in a simple fashion – when a single process ends, another one starts. After the second one ends, the results are combined into a third process.

This makes it a lot easier to coordinate your application, but it’s still ultimately blocking as it creates Threads and waits upon calling a .join() method.

Reactor motivation

Reactive Programming

What we want is asynchronous and non-blocking. A group of developers from companies like Netflix, Pivotal, RedHat, etc. got together and converged on something called The Reactive Streams Specification.

Project Reactor is Spring’s implementation of The Reactive Specification and it’s specifically favored by the Spring Webflux module, although you can use it with other modules like RxJava.

The idea is to operate Asynchronously with Backpressure using Publishers and Subscribers.

Here, we’re being introduced to several new concepts! Let’s explain them one by one:

  • Publisher – A Publisher is a provider of a potentially unbounded number of elements.
  • Subscriber – A Subscriber listens to that Publisher, asking for new data. Sometimes, it’s also referred to as a Consumer.
  • Backpressure – The ability of the Subscriber to let the Publisher how many requests can it handle at the time. So it’s the Subscriber that is responsible for the flow of the data, not the Publisher as it just provides the data.

The Reactor Project offers 2 types of publishers. These are considered the main building blocks of Spring Webflux:

  • Flux – is a publisher that produces 0 to N values. It could be unbounded. Operations that return multiple elements use this type.
  • Mono – is a publisher that produces 0 to 1 value. Operations that return a single element use this type.

Developing Reactive Applications

With all of the above in mind, let’s jump into creating a simple web application and take advantage over this new reactive paradigm!

The simplest way to start with a skeleton Spring Boot project, as always, is using Spring Initializr. Select your preferred version of Spring Boot and add the “Reactive Web” dependency. After this, generate it as a Maven project and you’re all set!

Let’s define a simple POJO – Greeting:

public class Greeting {
    private String msg;
    // Constructors, getters and setters
}


Defining a Publisher

Alongside it, let’s define a simple REST Controller with an adequate mapping:

@RestController
public class GreetReactiveController {
    @GetMapping("/greetings")
    public Publisher<Greeting> greetingPublisher() {
        Flux<Greeting> greetingFlux = Flux.<Greeting>generate(sink -> sink.next(new Greeting("Hello"))).take(50);
        return greetingFlux;
    }
}


Calling Flux.generate() will create a never ending stream of the Greeting object.

The take() method, as the name suggests, will only take first 50 values from the stream.

It’s important to note that the return type of the method is the asynchronous type Publisher<Greeting>.

To test this endpoint, navigate your browser to http://localhost:8080/greetings or use the curl client on your command line – curl localhost:8080/greetings

Server-Sent Events

Another publisher that has been used ever since their arrival is Server-Sent Events.

These events allow a web page to get updates from a server in real-time.

Let’s define a simple reactive server:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Greeting> sseGreetings() {
    Flux<Greeting> delayElements = Flux
            .<Greeting>generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
            .delayElements(Duration.ofSeconds(1));
    return delayElements;
}


Alternatively, we could’ve defined this:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Greeting> events() {
    Flux<Greeting> greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
    Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
    return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
}


Defining a Consumer

Now let’s see the consumer side of it. It’s worth noting that you don’t need to have a reactive publisher in order to use reactive programming on the consuming side:

public class Person {
    private int id;
    private String name;
    // Constructor with getters and setters
}


And then we have a traditional RestController with a single mapping:

@RestController
public class PersonController {
    private static List<Person> personList = new ArrayList<>();
    static {
        personList.add(new Person(1, "John"));
        personList.add(new Person(2, "Jane"));
        personList.add(new Person(3, "Max"));
        personList.add(new Person(4, "Alex"));
        personList.add(new Person(5, "Aloy"));
        personList.add(new Person(6, "Sarah"));
    }
    @GetMapping("/person/{id}")
    public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
            throws InterruptedException {
        Thread.sleep(delay * 1000);
        return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
    }
}



Our producer app is running on port 8080. Now let’s say that we want to call the /person/{id} endpoint 5 times. We know that, by default, each response takes a 2-second delay due to “network lag”.

Using RestTemplate

As expected it took a little over 10 secs and this is how Spring MVC works by default.

At this day and age, waiting for a little over 10 seconds for a result on a page is unacceptable. This is the difference between keeping a customer/client and losing it due to waiting for too long.

Spring Reactor introduced a new web client to make web requests called WebClient. Compared to RestTemplate, this client has a more functional feel and is fully reactive. It’s included in the spring-boot-starter-weblux dependency and it’s build to replace RestTemplate in a non-blocking way.

Let’s rewrite the same controller, this time, using WebClient:

And nothing happened, as expected.

This is because we are not subscribing . It’s asynchronous but it also doesn’t kick off until we call the .subscribe() method. This is a common problem with people who are new to Spring Reactor, so keep an eye out for this.

Let’s change our main method and add subscribe:

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
}


Adding the method prompts us with the wanted result:

This image has an empty alt attribute; its file name is spring-reactor-tutorial-9.png

The request is sent but the .subscribe() method doesn’t sit and wait for the response. Since it doesn’t block, it finished before receiving the response at all.

Could we counter this by chaining .block() at the end of the method calls?

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
}


Result:

This image has an empty alt attribute; its file name is spring-reactor-tutorial-10.png

The way to fix all of these problems is simple: We make a list of type Mono and wait for all of them to complete, rather than waiting for each one:

List<Mono<Person>> list = Stream.of(1, 2, 3, 4, 5)
    .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
    .collect(Collectors.toList());
Mono.when(list).block();


Result:

This image has an empty alt attribute; its file name is spring-reactor-tutorial-11.png

This is what we’re aiming for. This time, it took just over two seconds, even with massive network lag. This increases the efficiency of our application drastically and really is a game-changer.

If you look closely at the threads, Reactor is reusing them rather than creating new ones. This is really important if your application handles many requests in a short span of time.

Conclusion

In this blog, we discussed the need for reactive programming and Spring’s implementation of it – the Spring Reactor.

References :-

https://spring.io/reactive

Written by 

KRISHNA JAISWAL is Software Consultant Trainee at Knoldus. He is passionate about JAVA , MYSQL , having knowledge of C , C++ and much more. He is recognised as a good team player, a dedicated and responsible professional, and a technology enthusiast. He is a quick learner & curious to learn new technologies. His hobbies include reading Books , listening Music and playing Cricket .