If errors and failures are passed to the right component, which can handle them as notifications, the application can become more fault-tolerant or resilient. So if we build our system to be event-driven, we can more easily achieve scalability and failure tolerance, and a scalable, decoupled, and error-proof application is fast and responsive to users.”
Nickolay Tsvetinov
Reactive in layman’s language says how quickly the client handles the streaming data sent by the server. According to the Java community, it refers to asynchronous I/O and non-blocking processes. It works on the event-driven system to achieve responsiveness to users.
Reactive Stream API is a product of collaboration between Kaazing, Netflix, Pivotal, Red Hat, Twitter, Typesafe, and many others. It provides a powerful mechanism to address many problem statements in event-driven systems. It provides complete abstraction so that developers can focus only on their business logic.
A quick walk-through of error handling approaches using Spring Boot.
1. Simply Catch the error (onErrorResume)
The following code has a Flux named “stringFlux” that emits “RunTimeException” after creating a flux of three elements. In case of error, control jumps to “onErrorResume” block of code analogous to catch in java exception handling. We are using StepVerifier to subscribe to the events and verify event flow.
@Test | |
public void fluxErrorHandling(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value | |
System.out.println(e); | |
return Flux.just("default"); | |
}); | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectNext("default") | |
.expectError(RuntimeException.class) | |
.verify(); | |
} |
Execution of the above code snippet gives:-
java.lang.AssertionError: expectation “expectError(Class)” failed (expected: onError(RuntimeException); actual: onNext(default))
This happens because as soon as an exception occurs control jumps to onErrrorResume block where it prints the exception and returns a flux with default as a value. To correct above exception refer to the below-defined test case:-
@Test | |
public void fluxErrorHandling(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value | |
System.out.println(e); | |
return Flux.just("default"); | |
}); | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectNext("default") | |
.verifyComplete(); | |
} |
Event Sequence :-
15:12:52.127 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:12:52.135 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:12:52.138 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
java.lang.RuntimeException: Exception Occurred
15:12:52.139 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:12:52.140 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
Note: We are using verifyComplete() instead of verify() in updated test case.
This is because the onComplete() event terminates the error sequence. When there is no onComplete() event, it simply means some error occurred and the event sequence did not complete. In the above example, since we are returning a flux in error handling code, the event stream completes with onComplete().
Now, let’s have a look at other ways of error handling in reactive streams.
2. Catch the error and return static value using onErrorReturn()
This is a classic technique where we detect the error and return a static value on an error as our handling mechanism.In Contrast to above technique, onErrorReturn() returns a static value instead of Flux.
@Test | |
public void fluxErrorHandling_onErrorReturn(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorReturn("default"); // here returning a simple string on any errors | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectNext("default") | |
.verifyComplete(); | |
} |
Event Sequence :-
15:13:48.969 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:13:48.974 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:13:48.978 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:13:48.983 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
3. Catch the error and translate it to a custom exception (onErrorMap)
Nothing fancy about the below piece of code. This is the most common methodology for handling errors inside the map(). The handling logic here is responsible for translating the error into meaningful business exceptions.
@Test | |
public void fluxErrorHandling_onErrorMap(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorMap((e) -> new CustomException(e)); // here returning a simple string on any errors | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectError(CustomException.class) | |
.verify(); | |
} |
Event Sequence is as follows:-
15:15:51.883 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:15:51.890 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:15:51.935 [main] ERROR reactor.Flux.OnErrorResume.1 - onError(CustomException: Exception Occurred)
15:15:51.937 [main] ERROR reactor.Flux.OnErrorResume.1 -
CustomException: Exception Occurred
4. Catch the error and retry the same stream for a specific number of times (retry)
Retry mechanism is required when you lose connection with the source emitting the data and you would like to retry establishing the connection to the same source or a different one.
@Test | |
public void fluxErrorHandling_withRetry(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorMap((e) -> new CustomException(e)) | |
.retry(2); | |
// P.s. Retry produces same stream again | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectNext("a","b","c") | |
.expectNext("a","b","c") | |
.expectError(CustomException.class) | |
.verify(); | |
} |
Event Sequence displayed below clearly depicts 2 retries that happened with onNext Events in repetition.
Event Sequence :-
15:16:48.154 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
15:16:48.160 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.164 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.217 [main] ERROR reactor.Flux.Retry.1 - onError(CustomException: Exception Occurred)
15:16:48.219 [main] ERROR reactor.Flux.Retry.1 -
CustomException: Exception Occurred
5. Catch the error using back off along with retry (retryBackoff)
Before attempting a retry, when you would like to wait for a specific duration.
@Test | |
public void fluxErrorHandling_withRetryBackoff(){ | |
Flux<String> stringFlux = Flux.just("a","b","c") | |
.concatWith(Flux.error(new RuntimeException("Exception Occurred"))) | |
.concatWith(Flux.just("D")) | |
.onErrorMap((e) -> new CustomException(e)) | |
.retryBackoff(2, Duration.ofSeconds(5));// when you want to perform a backoff before retry | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext("a","b","c") | |
.expectNext("a","b","c") | |
.expectNext("a","b","c") | |
.expectError(IllegalStateException.class) | |
.verify(); | |
} |
Event Sequence:-
Event Sequence shows log message and we can see error signal is in play.
15:18:11.551 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:18:11.630 [main] INFO reactor.Flux.RetryWhen.1 - onSubscribe(SerializedSubscriber)
15:18:11.636 [main] INFO reactor.Flux.RetryWhen.1 - request(unbounded)
15:18:11.650 [main] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.750 [parallel-2] ERROR reactor.Flux.RetryWhen.1 - onError(java.lang.IllegalStateException: Retries exhausted: 2/2)
15:18:28.753 [parallel-2] ERROR reactor.Flux.RetryWhen.1 -
java.lang.IllegalStateException: Retries exhausted: 2/2
Observe the event stream above, you obtain IllegalStateException after all the retries are exhausted.
References
- My motivation for writing this blog comes from this course –https://www.udemy.com/course/build-reactive-restful-apis-using-spring-boot-webflux/learn/lecture/12962854#overview
