Working with Project Reactor: Reactive Streams

Table of contents
Reading Time: 4 minutes

.The words “Reactive” and “Streams” often go hand in hand. The streams API of Java 8 is a great tool for making your projects Reactive. But that’s not the only stream you can have. In this blog, I’d like to talk about this awesome project called Project Reactor.

Working with streams can be helpful for multiple use cases, but I would like to emphasize two specific instances:

  • We could pipeline our operations on a stream one after the other, and then evaluate the resulting stream with a terminal operation. All of this would not alter the state of our data source/collection.
  • These operations will be lazily evaluated and in general, it gives us a processing advantage.

But nothing comes without some tradeoffs, and that can be said about streams too. Streams are produced from a source and are generally subscribed to, and when they are subscribed to, we have to consider a possibility that the processing speed of stream consumer may not always be as fast or as slow as the producing speed of the stream producer. This is where Project Reactor provides its major advantage.

Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities. This communication too happens asynchronously and is known as Reactive Backpressure. So basically we can avoid the swamping or idle state of the stream consumer with the help of Reactive Backpressure.

In order to work with Streams, we need to provide an entity which initiates the stream, i.e. Producer. The reactive streams provide us with an interface known as the Publisher interface. The Reactor implementation of this interface is provided in the form of two classes:

To represent a stream on 0..1 elements, we use a Mono. Let’s have a look at an example for more clarity

Mono<String> streamOfAString = Mono.just("test-string");
streamOfAString
    .log()
    .subscribe(System.out::println);

Here we create a stream of one element using the “just” method.

To represent a stream of 0..n elements, we can use the Flux implementation of the Publisher interface viz.

Flux<String> streamOfStrings = Flux("string1","string2","string3");
streamOfStrings
    .log()
    .subscribe(System.out::println);

Here we do something similar, just that the number of elements represented is more than one. Now, something we may think about is why not just have Flux and is there a specific use case that Mono serves a purpose of? Well, there are use cases where a Mono makes more sense, like when a function is meant to return either one value or no value at all. Think about it this way, we wouldn’t prefer using a list of strings to capture a return value of a function which is meant to return just one, would we?

A stream published is of no use unless subscribed to, which is what we did in the above code. The data doesn’t start flowing unless we subscribe to the stream. In the above case, we issued a command to print every element of the stream. Let’s understand what really happens under the hood when we subscribe to a stream.

Screenshot from 2019-03-31 23-07-03

First in the order of execution is the onSubscribe method which establishes the subscription. Using this subscription we can request elements from the stream. Next in line is the request(unbounded) call. This method is used to request a number of elements from the stream. As a default value, it is set to unbounded. After that, we can see an onNext method where we can provide an operation to be applied to every subsequent element of the stream. And lastly, the onComplete method which executes at the end after receiving the last element.

This flow is provided to us by an implementation of the Subscriber interface of the Reactive Streams. Hence this is what gets executed internally on a call to subscribe(). To understand the implementation better let’s try to provide our own implementation indirectly.

Flux<String> streamOfStrings = Flux.just("First","second","third");
streamOfStrings
      .log()
      .subscribe(new Subscriber<String>() {
         @Override
         public void onSubscribe(Subscription subscription) {
            subscription.request(2);
         }

         @Override
         public void onNext(String s) {
            System.out.println(s);
         }

         @Override
         public void onError(Throwable throwable) {

         }

         @Override
         public void onComplete() {

         }
      });

Screenshot from 2019-03-31 23-28-41

We specifically gave the implementation where on subscription we requested just two stream elements. Note that we only work with two elements here and onComplete has not been called since the stream hasn’t been consumed till its last element. We just printed the value. We could have also requested more elements by providing a constraining code in the onNext() method. And hence the streaming would work reactively without swamping the consumer or even letting it be idle.

So this was a short introduction to reactive streams. Watch the recorded webinar video to explore the key aspects of Reactive Programming in Spring 5.0 with a live demo.

Knoldus-Scala-spark-services-company

Written by 

Ayush Prashar is a software consultant having more than 1 year of experience. He is familiar with programming languages such as Java, Scala, C, C++ and he is currently working on reactive technologies like Lagom, Akka, Spark, and Kafka. His hobbies include playing table tennis, basketball, watching TV series and football.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading