Spring Reactor – Backpressure

Reading Time: 3 minutes
reactor

In this blog, we are going to learn about  Backpressure in the Project reactor.

What is Backpressure

 Backpressure is the ability of a Consumer to signal the Producer that the rate of emission is higher than what it can handle. So using this mechanism, the Consumer gets control over the speed at which data is emitted. Using this, the Subscriber controls the data flow from the Publisher. The Subscriber makes use of request(n) to request n number of elements at a time.

backpressure

Backpressure in Reactive Streams

Due to the non-blocking nature of Reactive Programming, the server doesn’t send the complete stream at once. It can push the data concurrently as soon as it is available. Thus, the client waits less time to receive and process the events. But, there are issues to overcome. Backpressure in software systems is the capability to overload the traffic communication. In other words, emitters of information overwhelm consumers with data they are not able to process.

Controlling Backpressure

Basically, there are three strategies to follow:

  • Send new events only when the subscriber requests them: This is a pull strategy to gather elements at the emitter request
  • Limiting the number of events to receive at the client-side: Working as a limited push strategy the publisher only can send a maximum amount of items to the client at once
  • Cancelling the data streaming when the consumer cannot process more events: In this case, the receiver can abort the transmission at any given time and subscribe to the stream later again
controlling backpressure

Implementing Backpressure Mechanism

To implement the examples, we’ll simply add the Spring WebFlux starter and Reactor test dependencies to our pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Request

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
    Flux request = Flux.range(1, 50);

    request.subscribe(
      System.out::println,
      err -> err.printStackTrace(),
      () -> System.out.println("All 50 items have been successfully processed!!!"),
      subscription -> {
          for (int i = 0; i < 5; i++) {
              System.out.println("Requesting the next 10 elements!!!");
              subscription.request(10);
          }
      }
    );

    StepVerifier.create(request)
      .expectSubscription()
      .thenRequest(10)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .thenRequest(10)
      .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
      .thenRequest(10)
      .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
      .thenRequest(10)
      .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
      .thenRequest(10)
      .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
      .verifyComplete();

Limit

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
    Flux<Integer> limit = Flux.range(1, 25);

    limit.limitRate(10);
    limit.subscribe(
      value -> System.out.println(value),
      err -> err.printStackTrace(),
      () -> System.out.println("Finished!!"),
      subscription -> subscription.request(15)
    );

    StepVerifier.create(limit)
      .expectSubscription()
      .thenRequest(15)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .expectNext(11, 12, 13, 14, 15)
      .thenRequest(10)
      .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
      .verifyComplete();
}

Cancel

@Test
public void whenCancel_thenSubscriptionFinished() {
    Flux<Integer> cancel = Flux.range(1, 10).log();

    cancel.subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnNext(Integer value) {
            request(3);
            System.out.println(value);
            cancel();
        }
    });

    StepVerifier.create(cancel)
      .expectNext(1, 2, 3)
      .thenCancel()
      .verify();
}

Conclusion

Hence, it can provide availability, robustness, and stability when the publisher overwhelms the consumer with too many events. In summary, it can prevent systemic failures due to high demand.

References

https://www.baeldung.com/spring-webflux-backpressure

https://jstobigdata.com/