1. Overview
In this article, we’ll get introduced to the Spring Reactor project and its importance. The idea is to take advantage of the Reactive Streams Specification to build non-blocking reactive applications on the JVM.
Using this knowledge, we’ll try to build a simple reactive application and compare it to a traditional blocking application.
2. Conventional APIs are blocking
Modern applications deal with a high number of concurrent users and data. ava developers by default write blocking code. It’s just how the API was set up. Another example would be the traditional servlet (Tomcat) approach. Each request warrants a new thread that waits for the whole background process to finish in order to send the response back.
This means that our data layer logic is blocking the application by default since Threads idly waits for a response. It’s wasteful to not reuse these threads for a different purpose, while we wait for the response to get back.
We can write some clean code using Java’s Callback and Futures/CompletableFuture. And can join threads at some later point in our program. This makes things a lot easier to coordinate according to the requirements but it is still blocking as it creates threads and waits upon calling a .join() method.
3. Reactive Programming
Before we look at Reactor, we should look at the Reactive Streams Specification. This is what Reactor implements, and it lays the groundwork for the library.
Essentially, a Reactive Stream is a specification for asynchronous stream processing.
One of the main goals of this is to address the problem of backpressure. If we have a producer which is emitting events to a consumer faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources.
Backpressure means that our consumer should be able to tell the producer how much data to send in order to prevent this, and this is what is laid out in the specification.
Dependencies
In this article, we will be discussing and resolving reactive stream dependencies from the maven point of view.
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>
We have included the Logback library as well because we will be logging the output of Reactor in order to understand the flow better.
4. Producing a stream of Data
In order for an application to be reactive, we must have a constant flow of data to be our first logical step. Without this data, the application will not have anything to react to.
The reactive core provides us with two data types that allow us to generate a stream of data.
- Flux
- Mono
4.1 Flux
Using Flux we can have a stream of data that can produce 0…n elements.
Flux<Integer> streamOfIntegers = Flux.just(1, 2, 3, 4, 5);
In this case, we have a static stream of 5 elements.
4.2 Mono
Using Mono, we can produce a stream of 0..1 element at a time.
Mono<Integer> streamOfIntegers = Mono.just(1);
here we are limited to only 1 element in the produced stream.
5. Subscribing to a Stream
Now we have a high-level understanding of creating a Stream of data, let us look at how to subscribe to the produced stream.
5.1 Collecting elements
We will use the subscribe() method to collect the data from the stream. See the example given below.
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4, 5)
.log()
.subscribe(elements::add);
assertThat(elements).containsExactly(1, 2, 3, 4, 5);
The data will start flowing only after the subscribe() is called. The log() method is to add logging in order to understand the flow by reading logs.
5.2 Understanding the flow of elements
With the logging in place, the logs would look like this
20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(5)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()
Let’s go through the logs to understand things better.
- onSubscribe() : This is called when we subscribe to a stream.
- request(unbounded) : When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
- onNext() : This is called on every single element.
- onComplete() : his is called last, after receiving the last element. There’s actually a onError() as well, which would be called if there is an exception, but in this case, there aren’t any.
Behind the scenes, subscribe() instantiates the Subscriber interface. To understand that better, let’s implement the Subscriber directly.
Flux.just(1, 2, 3, 4, 5)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
6. BackPressure
The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.
Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.
We can modify our Subscriber implementation to apply backpressure. Let’s tell the upstream to only send two elements at a time by using request():
Flux.just(1, 2, 3, 4, 5) .log() .subscribe(new Subscriber<Integer>() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });
Now if we run our code again, we’ll see the request(2) getting called followed by two onNext() calls, then request(2), and so on.
23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(5)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()
Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.
7. Operating on Reactive Streams
Let’s see how we can perform some basic operations on the stream.
7.1 Mapping data onto the stream
A simple operation that we can perform is applying a transformation. In this case, let’s just double all the numbers in our stream:
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4, 5)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8, 10);
map() will be applied everytime onNext() is called.
7.2 Combining two streams
We can then make things more interesting by combining another stream with this one. Let’s try this by using the zip() function:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:
20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()
9. Conclusion
In this article, we’ve given a high-level, end-to-end overview of Reactive Core. We’ve explained how we can publish and subscribe to streams, apply backpressure, operate on streams, and also handle data asynchronously. This should hopefully lay a foundation for us to write reactive applications.