Reactive streams support infinite data and handle backpressure. What if we want to perform a batch operation on top of the reactive stream. Can we batch over the infinite stream? In this blog, we will see how we can batch with Project Reactor reactive stream without breaking the streaming boundary and with handling the backpressure.
The Problem Space
In my project, one of the Microservice’s primary tasks is to index documents in Elasticsearch. It is currently accepting the following stream of requests and indexes them one by one by calling the elastic search client.
Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) { // code goes here }
As an optimization, we want to increase the indexing speed by leveraging the elastic search bulk API, which accepts a list of documents at once and indexes it in one go. As a result, we get a faster indexing speed. The challenge here is that we don’t want to collect the Flux<IndexRequest>
into a list of List<IndexRequest>
as it would break the streaming boundary and the backpressure support won’t be there. Look at the example snippet below to solve the problem:
Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) {
//make a list of indexRequest
List<IndexRequest> indexRequests = requestFlux.publishOn(scheduler).collect(Collection.toList)
//send it ot elasticsearch bulk client
esclient.bulkAsync(indexRequests)
}
The above code snippet will break the streaming boundary as the first statement will wait for all the elements in the Flux of request to be completed. We never know the document size might be in millions which will overwhelm the memory of JVM.
Solution – Using buffer in the Flux class
One of the approaches to solve this is using the buffer method provided by the Reactor Library in the class Flux

As per the documentation of the method above, this will emit the results in the required batch size in the List as soon as the buffer size is equal to maxSize
. This will happen within the boundary of the stream. The data will be served gracefully because we are not performing collect
on the entire Flux at once. Let us look at the bulk indexing use case again:
Flux<ElasticResponse> indexInElasticSearcch(Flux<IndexRequest> requestFlux) {
//make a list of indexRequest
Flux<IndexRequest> indexResponse = requestFlux
.publishOn(scheduler)
.buffer(BATCH_SIZE)
.map(esclient::bulkAsync) //Dummy request for bulk Elasticsear client taking List<IndexRequest> as input
}
There might be many different approaches to solving the above, and I would appreciate any discussion around simplifying the above problem or solving it in a different way! Hope it helped!
References:
- Project Reactor Reference
- Elaticsearch bulk indexing
