Intro to Spring Reactor: Part 2

Reading Time: 2 minutes

The world is a stage where all of us are artists and Constant learning is the foundation of success.
In order to continue your learning with something new, here were are going to learn deep into Spring Reactor.

Here we will look into Advance details about Spring Reactor like Backpressure, Operation, and more.

Backpressure

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.
We can tell upstream to only send two elements at a time by using request():

Flux.just(2, 4, 8, 10)
  .log()
  .subscribe(new Subscriber<Integer>() {
    private Subscription subscription;
    int onNextValue;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        ssubscription.request(2);
    }

    @Override
    public void onNext(Integer integer) {
        elements.add(integer);
        onNextValue++;
        if (onNextValue % 2 == 0) {
            subscription.request(2);
        }
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});


Output:- 
 
request(2)
onNext(2)
onNext(4)
request(2)
onNext(8)
onNext(10)
request(2)
onComplete()

Operating on a Stream

We can do operations on our stream as follows:

Mapping Data in Streams

 Let’s just add 5 to all the numbers in our stream:

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i + 2)
  .subscribe(elements::add);

Combining Two Different Streams

We can use zip() method to combine two streams:

Flux.just(2, 4, 6, 8)
  .log()
  .map(i -> i * 2)
  .zipWith(Flux.range(0, 10), 
    (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
  .subscribe(elements::add);

Output:

  "First Flux: 4, Second Flux: 0"
  "First Flux: 8, Second Flux: 1"
  "First Flux: 12, Second Flux: 2"
  "First Flux: 16, Second Flux: 3"

Schedulers

It provides some guarantees required by Reactive Streams flows like FIFO execution.
You can use or create efficient schedulers to jump thread on the producing flows or receiving flows.

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .publishOn(Schedulers.single())
  .subscribe(elements::add);

ParallelFlux

They can starve your CPU’s from any sequence whose work can be subdivided into concurrent tasks. Turn back into a Flux with ParallelFlux.sequential() as an unordered join.

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(elements::add);

Hot Streams

A more realistic use case for reactive might be something that happens infinitely.
Hot streams are always running and can be subscribed to at any point in time, but missing the start of the data.
We can do it as follows:

ConnectableFlux

Converting a cold stream into a Hot stream.

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .publish();

publish.subscribe(System.out::println);        
publish.subscribe(System.out::println);  //allowing us to add multiple subscriptions

publish.connect();  //Flux will start emitting:

Throttling

Running our code can lead our console to get overwhelmed with logging.
We can use sample() method with an interval of two seconds. So, values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .sample(ofSeconds(2))
  .publish();

publish.connect();  //Flux will start emitting:

This is pretty much from the blog, if you liked the article, please give me a thumbs up and I will keep writing blogs like this for you in the future as well. Keep reading and Keep coding.

Reference

knoldus

Written by 

I am a genius middle-class noble philanthropist(by heart) . & Simply a technology enthusiast who loves to share and gain knowledge through blogs.