Reactive Spring: Define a REST endpoint as a continuous stream

Table of contents
Reading Time: 2 minutes

In the REST APIs, all Http requests are stateless. We fire the request and get the response, That’s it. It does not keep any state for any HTTP request. The connection between client and server is lost once the transaction ends, so 1 response for 1 request.

But sometimes, we get the requirement to have a continuous response for a single request. This continuous response is called Streaming response. So in this blog, we will see how we can do that in Spring.

Here, I am assuming, we are already having a logic which is creating continuous output. I am just going to show how we can wrap the continuous output in a streaming response and can send it to the client. So let’s begin.

For the continuous output, we will be using the below piece of code:

Flux.interval(Duration.ofSeconds(1));

The above code will produce a sequential long value starting from 0, in the interval of 1 second infinitely.

To send the streaming response, we need to set the produced media type as follows:

MediaType.APPLICATION_STREAM_JSON_VALUE

The full code for GetMapping:

@GetMapping(value = “/streaming”, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)

The full code for the controller:

@RestController
public class StreamingController {

@GetMapping(value = “/streaming”, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Long> getItemsStream(){

return Flux.interval(Duration.ofSeconds(1)); // You can write your own logic here.
}
}

That’s it for the controller. It’s very simple.

Now, let’s write the test cases to test the controller behavior.

First, we need to add below dependencies in pom.xml:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>

Now, write the test file:

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureWebTestClient
public class StreamingControllerTest {

@Autowired
WebTestClient webTestClient;

@Test
public void fluxStream() {

Flux<Long> longStreamFlux = webTestClient.get().uri(“/streaming”)
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.expectStatus().isOk()
.returnResult(Long.class)
.getResponseBody();
StepVerifier.create(longStreamFlux)
.expectNext(0l)
.expectNext(1l)
.expectNext(2l)
.thenCancel()
.verify();
}
}

Here, we are using WebTestClient to test the REST endpoint and StepVerifier to test the Flux output. You can see, we are expecting 3 elements and then we have called cancel. As it is a streaming response, so we would not get onComplete event here so we would need to cancel the request explicitly to verify the behavior. You can get the full code here.

I hope, this blog will help you to define the streaming endpoint for Spring.

blog_footer

Written by 

Rishi is a tech enthusiast with having around 10 years of experience who loves to solve complex problems with pure quality. He is a functional programmer and loves to learn new trending technologies. His leadership skill is well prooven and has delivered multiple distributed applications with high scalability and availability by keeping the Reactive principles in mind. He is well versed with Scala, Akka, Akka HTTP, Akka Streams, Java8, Reactive principles, Microservice architecture, Async programming, functional programming, distributed systems, AWS, docker.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading