Elasticsearch: How to paginate over selected data in elasticsearch with scala using Scroll API of elasticsearch.


Elasticsearch is  real-time,distributed,full-text search analytics engine.It is built on top of Apache Lucene™.You can read it more on their website.

It provides a distributed, multitenant-capable full-text search engine with a RESTful web interface and schema-free JSON documents. Elasticsearch is developed in Java and is released as open source under the terms of the Apache License. Elasticsearch is the second most popular enterprise search engine.

In this post we will learn pagination over selected data using Scroll API of  elasticsearch .The scenario will be that we will take json data from input file and insert into ES index ,after that we will request a search query through Scroll API and fetch specified size of records from scrolled data.

Now, start with adding dependency of elasticsearch in the project.Here is the snippet in build.sbt file.

name := “ES-Scroll-API”

scalaVersion :=  “2.11.4”

libraryDependencies  ++= {
Seq(
“org.elasticsearch” % “elasticsearch” % “1.5.2”,
“ch.qos.logback”       %     “logback-classic”          %      “1.0.13”
)
}

First of all We need to create node in scala using java api which will interact with elasticsearch server running on our machine.I have created method getClient() which returns local  node client.

def getClient(): Client = {
    val node = nodeBuilder().local(true).node()
    val client = node.client()
    insertBulkDoc(client)
    val refresh = new RefreshRequest()
    client.admin().indices().refresh(refresh)
    Thread.sleep(2000)
    client.admin().indices().refresh(refresh)
    client
  }

Elasticsearch is schemaless. We can index any json to it. We have a  inputJson file, each line is a json. For our implementation: Application reads file line by line and insert json into the elasticsearch index . For this i have created insertBulkDoc() method which is  uses bulk api for insert set of documents in elastic search index.

Here is the complete insertBulkDoc() method.

def insertBulkDoc(client: Client) {
    val sourceBuffer = Source.fromFile("src/main/resources/inputJson.json")
    val bulkJson = sourceBuffer.getLines().toList
    val bulkRequest = client.prepareBulk()
    for (i <- 0 until bulkJson.size) {
      bulkRequest.add(client.prepareIndex("gnip_index", "twitter", s"${i + 1}").setSource(bulkJson(i)))
    }
    bulkRequest.execute().actionGet()
    sourceBuffer.close()
  }

After this we will perform fetch chunks of data through scroll api.Scroll api of ES provides effective way to paginate over selected data.Each call to the scroll API returns the next batch of results until there are no more results left to return, ie the hits array is empty.The initial search request and each subsequent scroll request returns a new _scroll_id — only the most recent _scroll_id should be used for retrieve current page data.for this i have created scrollFetch() method.for more information about Scroll API here

Here is the complete scrollFetch() method.


def scrollFetch(client: Client, query: QueryBuilder, indexName: String, outputFileUrl: String,
                   scrollSize: Int = 10, scrollKeepAlive: String = "10m"): Int = {

    var scrollResp = client.prepareSearch(indexName).setSearchType(SearchType.SCAN).
      setScroll(scrollKeepAlive).setQuery(query).setSize(scrollSize).execute().actionGet()
    val totalCount = scrollResp.getHits.getTotalHits
    log.info("totalhits " + totalCount)
    var successCount = 0
    if (totalCount > 0) {
      val outputFile = new File(outputFileUrl)
      val outputFileName = outputFile.getAbsolutePath
      val outputStream = new FileOutputStream(outputFile)
      val outputWriter = new OutputStreamWriter(outputStream)
      @tailrec
      def fetch() {
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet()
        log.info("scroll length " + scrollResp.getHits.getHits.length)
        if (scrollResp.getHits.getHits.length == 0) {
          try {
            client.prepareClearScroll().addScrollId(scrollResp.getScrollId).execute().actionGet()
            return
          } catch {
            case ex: Throwable =>
              log.error("we can't more scroll due to " + ex)
              return
          }

        } else {
          successCount = writeDataOnLocalFile(scrollResp.getHits.getHits, outputWriter, successCount, outputFileName)
          fetch()
        }
      }
      fetch()
      outputWriter.flush()
      outputWriter.close()
      successCount
    } else successCount
  }

Now, for validation we store  scrolled records into local file.For this  we have a writeDataOnLocalFile() private method which writes fetched records into local file system.

Here is the complete writeDataOnLocalFile() method.

 private def writeDataOnLocalFile(searchHit: Array[SearchHit], outputWriter: OutputStreamWriter, successCount: Int, outputFileName: String): Int = {
    var successCountModified = successCount
    var badWriter = false
    for (oneHit <- searchHit) {
      if (!badWriter) {
        try {
          val docId = oneHit.getId
          val dataSource = oneHit.getSourceAsString
          val writableDoc = s"""{"_id":${docId},"_source":${dataSource}}\n"""
          outputWriter.write(writableDoc)
          successCountModified += 1
        } catch {
          case ex: IOException =>
            log.error(s"can't write to the file ${outputFileName} due to ${ex}")
            badWriter = true
        }
      }
    }

    successCountModified
  }

After this we can delete index from our node,for this i have created method deleteIndex() which takes client and index name as argument .

Here is the complete deleteIndex() method.

 def deleteIndex(client: Client, indexName: String) {
    val deleteIndexRequest = new DeleteIndexRequest(indexName)
    val deleteResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
    if (deleteResponse.isAcknowledged())
      log.info("index is successfully deleted")
    else
      log.warn("index is not deleted ")
  }

Here is the complete application.

package com.es.narayan

import java.io.OutputStreamWriter
import org.elasticsearch.search.SearchHit
import java.io.FileOutputStream
import org.elasticsearch.action.search.SearchType
import scala.annotation.tailrec
import java.io.IOException
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilder
import java.io.File
import org.elasticsearch.client.Client
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import scala.io.Source
import org.slf4j.LoggerFactory

/**
 * @author narayan
 *
 */
trait ESScrollApi {
  
  
  val log = LoggerFactory.getLogger(this.getClass)

  /**
   * This is scrollFetch method which provides pagination over selected data
   *
   * @param client
   * @param query
   * @param indexName
   * @param outputFileUrl
   * @param scrollSize
   * @param scrollKeepAlive
   * @return
   */
  def scrollFetch(client: Client, query: QueryBuilder, indexName: String, outputFileUrl: String,
                   scrollSize: Int = 10, scrollKeepAlive: String = "10m"): Int = {

    var scrollResp = client.prepareSearch(indexName).setSearchType(SearchType.SCAN).
      setScroll(scrollKeepAlive).setQuery(query).setSize(scrollSize).execute().actionGet()
    val totalCount = scrollResp.getHits.getTotalHits
    log.info("totalhits " + totalCount)
    var successCount = 0
    if (totalCount > 0) {
      val outputFile = new File(outputFileUrl)
      val outputFileName = outputFile.getAbsolutePath
      val outputStream = new FileOutputStream(outputFile)
      val outputWriter = new OutputStreamWriter(outputStream)
      @tailrec
      def fetch() {
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet()
        log.info("scroll length " + scrollResp.getHits.getHits.length)
        if (scrollResp.getHits.getHits.length == 0) {
          try {
            client.prepareClearScroll().addScrollId(scrollResp.getScrollId).execute().actionGet()
            return
          } catch {
            case ex: Throwable =>
              log.error("we can't more scroll due to " + ex)
              return
          }

        } else {
          successCount = writeDataOnLocalFile(scrollResp.getHits.getHits, outputWriter, successCount, outputFileName)
          fetch()
        }
      }
      fetch()
      outputWriter.flush()
      outputWriter.close()
      successCount
    } else successCount
  }

  /**
   * writeDataOnLocalFile is private method which is used for write page data into local file
   *
   * @param searchHit
   * @param outputWriter
   * @param successCount
   * @param outputFileName
   * @return
   */
  private def writeDataOnLocalFile(searchHit: Array[SearchHit], outputWriter: OutputStreamWriter, successCount: Int, outputFileName: String): Int = {
    var successCountModified = successCount
    var badWriter = false
    for (oneHit <- searchHit) {
      if (!badWriter) {
        try {
          val docId = oneHit.getId
          val dataSource = oneHit.getSourceAsString
          val writableDoc = s"""{"_id":${docId},"_source":${dataSource}}\n"""
          outputWriter.write(writableDoc)
          successCountModified += 1
        } catch {
          case ex: IOException =>
            log.error(s"can't write to the file ${outputFileName} due to ${ex}")
            badWriter = true
        }
      }
    }

    successCountModified
  }

  /**
   * This is insertBulkDoc method which is used for read data from file and write into ES index
   *
   * @param client
   */
  def insertBulkDoc(client: Client) {
    val sourceBuffer = Source.fromFile("src/main/resources/inputJson.json")
    val bulkJson = sourceBuffer.getLines().toList
    val bulkRequest = client.prepareBulk()
    for (i <- 0 until bulkJson.size) {
      bulkRequest.add(client.prepareIndex("gnip_index", "twitter", s"${i + 1}").setSource(bulkJson(i)))
    }
    bulkRequest.execute().actionGet()
    sourceBuffer.close()
  }

  /**
   * deleteIndex method is used for delete index from ES node
   *
   * @param client
   * @param indexName
   */
  def deleteIndex(client: Client, indexName: String) {
    val deleteIndexRequest = new DeleteIndexRequest(indexName)
    val deleteResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
    if (deleteResponse.isAcknowledged())
      log.info("index is successfully deleted")
    else
      log.warn("index is not deleted ")
  }

}

We can run this application by extending ESScrollApi trait in our main object like.

Here is the main object.

package com.es.narayan

import org.elasticsearch.action.admin.indices.refresh.RefreshRequest
import org.elasticsearch.client.Client
import org.elasticsearch.node.NodeBuilder.nodeBuilder
import org.elasticsearch.index.query.QueryBuilders

object ESApiObject extends App with ESScrollApi {


  /**
   * getClient method returns local node client
   *
   * @return
   */
  def getClient(): Client = {
    val node = nodeBuilder().local(true).node()
    val client = node.client()
    insertBulkDoc(client)
    val refresh = new RefreshRequest()
    client.admin().indices().refresh(refresh)
    Thread.sleep(2000)
    client.admin().indices().refresh(refresh)
    client
  }
  val client = getClient

  val result = scrollFetch(client, QueryBuilders.matchAllQuery(), "gnip_index", "/tmp/outputJson.json")
  log.info("total number of scrolled documents is " + result)
  deleteIndex(client, "gnip_index")
  client.close()
}

After this go to sbt console and type  => ‘sbt run’ we will get expected output on console as well as on local file.

output2

Download the source code to check the functionality. GitHub

About Narayan Kumar

Software Consultant at Knoldus Software LLP.
This entry was posted in Scala. Bookmark the permalink.

One Response to Elasticsearch: How to paginate over selected data in elasticsearch with scala using Scroll API of elasticsearch.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s