Batch Operations in Reactive Stream – Using buffer with Project Reactor

woman holding tablet computer
Reading Time: 2 minutes

Reactive streams support infinite data and handle backpressure. What if we want to perform a batch operation on top of the reactive stream. Can we batch over the infinite stream? In this blog, we will see how we can batch with Project Reactor reactive stream without breaking the streaming boundary and with handling the backpressure.

The Problem Space

In my project, one of the Microservice’s primary tasks is to index documents in Elasticsearch. It is currently accepting the following stream of requests and indexes them one by one by calling the elastic search client.

Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) { // code goes here } 

As an optimization, we want to increase the indexing speed by leveraging the elastic search bulk API, which accepts a list of documents at once and indexes it in one go. As a result, we get a faster indexing speed. The challenge here is that we don’t want to collect the Flux<IndexRequest> into a list of List<IndexRequest> as it would break the streaming boundary and the backpressure support won’t be there. Look at the example snippet below to solve the problem:

Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) {

//make a list of indexRequest  
List<IndexRequest> indexRequests = requestFlux.publishOn(scheduler).collect(Collection.toList)  

//send it ot elasticsearch bulk client
esclient.bulkAsync(indexRequests)         
} 

The above code snippet will break the streaming boundary as the first statement will wait for all the elements in the Flux of request to be completed. We never know the document size might be in millions which will overwhelm the memory of JVM.

Solution – Using buffer in the Flux class

One of the approaches to solve this is using the buffer method provided by the Reactor Library in the class Flux

As per the documentation of the method above, this will emit the results in the required batch size in the List as soon as the buffer size is equal to maxSize. This will happen within the boundary of the stream. The data will be served gracefully because we are not performing collect on the entire Flux at once. Let us look at the bulk indexing use case again:

Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) {

//make a list of indexRequest  
Flux<IndexRequest> indexResponse = requestFlux
                                   .publishOn(scheduler)
                                   .buffer(BATCH_SIZE)
                                   .map(esclient::bulkAsync) //Dummy request for bulk Elasticsear client taking List<IndexRequest> as input
         
}

There might be many different approaches to solving the above, and I would appreciate any discussion around simplifying the above problem or solving it in a different way! Hope it helped!

References:

Written by 

Manish Mishra is Lead Software Consultant, with experience of more than 7 years. His primary development technology was Java. He fell for Scala language and found it innovative and interesting language and fun to code with. He has also co-authored a journal paper titled: Economy Driven Real Time Deadline Based Scheduling. His interests include: learning cloud computing products and technologies, algorithm designing. He finds Books and literature as favorite companions in solitude. He likes stories, Spiritual Fictions and Time Traveling fictions as his favorites.