Introduction
Reactive programming is supported by Spring Framework since version 5 and This support is built on top of Project Reactor.
Project Reactor is a Reactive library for building non-blocking applications on the JVM and it is based on the Reactive Streams Specification. Project Reactor is the foundation of the reactive stack in the Spring ecosystem and it is being developed in close collaboration with Spring. Here WebFlux, Spring’s reactive-stack web framework, requires Reactor as a core dependency in the project.
Reactive Streams Specification
Before knowing Reactor, we should know the Reactive Streams Specification. This is what Reactor implements, and it lays the groundwork for a library.
A Reactive Stream is a specification for processing asynchronous streams.
In another word, consider a system where lots of events are being produced and consumed asynchronously. Now think about a stream of thousands of stock updates per second coming into a financial application, and for it to have to respond to those updates in a timely manner.
One of the main targets of this is to address the problem of backpressure. If we have a producer which is emitting events to a consumer very faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources.
So Backpressure means that our consumer should be able to tell the producer how much data to send in order to stop this, and this is what is laid out in the specification.
Maven Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Producing a Stream of Data
So for an application to be reactive, the first thing it must be able to do is to produce a stream of data.
Reactive Core gives us two data types i.e Flux and Mono that enable us to do this.
Flux
Flux is a standard Publisher and it represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either an error or completion signal. In the Reactive Streams spec, there are three types of signal translated to calls to a downstream Subscriber’s i.e. onComplete, onError, and onNext
methods.
Having this huge scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove them onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.
Flux<String> seq1 = Flux.just("hello", "world", "helloworld");
List<String> iterable = Arrays.asList("hello", "world", "helloworld");
Flux<String> seq2 = Flux.fromIterable(iterable);
Mono
A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext the signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).
Most of the Mono implementations are expected to call immediately onComplete on their Subscriber after having called onNext. Mono.never() is an outlier, this doesn’t emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onError and onNext is explicitly forbidden.
Mono only offers a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux.
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("hello");
Subscribing to a Stream
Now we have understood above how to produce a stream of data, we need to subscribe to it in order for it to emit the elements.
For receiving messages from a Publisher and processing those messages, the Subscriber is responsible. Hence it acts as a terminal operator in the Streams API. Basically, it has four methods of dealing with the events received:
- onSubscribe(Subscription s) :- This method gets called automatically when a publisher registers itself and allows the subscription to request data.
- onNext(T t) :- This method gets called on the subscriber every time it is ready to receive a new message of generic type T.
- onError(Throwable t) :- This method is used to handle the next steps whenever an error occurs.
- onComplete() :- This method allows performing operations in case of successful subscription of data.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Collecting Elements
Let’s use the subscribe() method to collect all the elements in a stream:
List<String> dataStream = new ArrayList<>(); Flux.just("hello", "world", "helloworld") .log() .subscribe(dataStream::add);
Backpressure
WebFlux is based on Reactor, which is a reactive-stream implementation. One of the important points of reactive streams is handling the backpressure.
Backpressure is a process of dealing with a data stream that may be very large at times to be reliably processed. Its target is to feed the data to subscribers at the rate at which they can reliably deal with that data. The unprocessed data can be buffered (or we could choose a different strategy), hence the pressure analogy. Assume water pressure and a firefighter’s hose as in the featured picture. The firefighter only lets as much water out as she can deal with.
Backpressure or the ability of the consumer to signal to the producer that the rate of emission is too high.
Let’s understand in a more technical way. The motive behind reactive streams is to enable the pull-push hybrid approach to data streams. The subscriber can request only a specific amount of data, while the source can push that data in a configured way. If the data stream is very large, the data waiting for processing can be handled by a buffering strategy.
The below example explains how the Subscriber can limit the pace of emission by invoking the request(n) method on the Subscription.
@Test
public void backpressureDemo() {
Flux.range(1,8)
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int count;
@Override
public void onSubscribe(Subscription s) {
System.out.println("Method onSubscribe");
this.s = s;
System.out.println("Here requesting 2 emissions");
s.request(2);
}
@Override
public void onNext(Integer i) {
System.out.println("onNext " + i);
count++;
if(count % 2 == 0) {
System.out.println("Here requesting 2 emissions");
s.request(2);
}
}
@Override
public void onError(Throwable t) {
System.err.println("Method onError");
}
@Override
public void onComplete() {
System.out.println("Method onComplete");
}
});
}
Now, run the above code and you will see that two values are emitted at a time as requested:
Method onSubscribe
Here requesting 2 emissions
onNext 1
onNext 2
Here requesting 2 emissions
onNext 3
onNext 4
Here requesting 2 emissions
onNext 5
Method onComplete
Conclusion
In this blog, we’ve explained how we can publish and subscribe to streams, and apply backpressure.