The world is a stage where all of us are artists and Constant learning is the foundation of success.
In order to continue your learning with something new, here were are going to learn deep into Spring Reactor.
Here we will look into Advance details about Spring Reactor like Backpressure, Operation, and more.
Backpressure
Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.
We can tell upstream to only send two elements at a time by using request():
Flux.just(2, 4, 8, 10)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
int onNextValue;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
ssubscription.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextValue++;
if (onNextValue % 2 == 0) {
subscription.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
Output:-
request(2)
onNext(2)
onNext(4)
request(2)
onNext(8)
onNext(10)
request(2)
onComplete()
Operating on a Stream
We can do operations on our stream as follows:
Mapping Data in Streams
Let’s just add 5 to all the numbers in our stream:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i + 2)
.subscribe(elements::add);
Combining Two Different Streams
We can use zip() method to combine two streams:
Flux.just(2, 4, 6, 8)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, 10),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
Output:
"First Flux: 4, Second Flux: 0"
"First Flux: 8, Second Flux: 1"
"First Flux: 12, Second Flux: 2"
"First Flux: 16, Second Flux: 3"
Schedulers
It provides some guarantees required by Reactive Streams flows like FIFO execution.
You can use or create efficient schedulers to jump thread on the producing flows or receiving flows.
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.publishOn(Schedulers.single())
.subscribe(elements::add);
ParallelFlux
They can starve your CPU’s from any sequence whose work can be subdivided into concurrent tasks. Turn back into a Flux with ParallelFlux.sequential() as an unordered join.
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
Hot Streams
A more realistic use case for reactive might be something that happens infinitely.
Hot streams are always running and can be subscribed to at any point in time, but missing the start of the data.
We can do it as follows:
ConnectableFlux
Converting a cold stream into a Hot stream.
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();
publish.subscribe(System.out::println);
publish.subscribe(System.out::println); //allowing us to add multiple subscriptions
publish.connect(); //Flux will start emitting:
Throttling
Running our code can lead our console to get overwhelmed with logging.
We can use sample() method with an interval of two seconds. So, values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();
publish.connect(); //Flux will start emitting:
This is pretty much from the blog, if you liked the article, please give me a thumbs up and I will keep writing blogs like this for you in the future as well. Keep reading and Keep coding.