Elasticsearch: CURD Operations and sorting documents by time stamp with scala using java api of elastic search


Elasticsearch is an open-source search engine built on top of Apache Lucene™, a full-text search-engine library. You can read it more on their website.

Elasticsearch is also written in Java and uses Lucene internally for all of its indexing and searching, but it aims to make full-text search easy by hiding the complexities of Lucene behind a simple, coherent, RESTful API.

In this post, we will learn to use elasticsearch java api in Scala. The scenario will be that we will perform CRUD operation on elasticsearch. Then searching and sorting on the basis of time stamp and retrieve specified size of document  from elasticsearch  index and validate it.

we will start with adding dependency of elasticsearch in the project. At the time 1.5.2 is the latest. Here is the snippet in build.sbt file.

name := “crudOnEs”

scalaVersion :=  “2.11.4”

libraryDependencies  ++= {
Seq(
“org.elasticsearch” % “elasticsearch” % “1.5.2”
)
}

First of all We need to create node in scala using java api which will interact with elasticsearch server running on our machine.I have created method getClient() which returns local client.

def getClient(): Client = {
    val node = nodeBuilder().local(true).node()
    val client = node.client()
    client
  }

After this we will create an index with mappings and settings using addMappingToIndex() method.We are using  XContentBuilder for creating mapping json object.

val mappingBuilder = (jsonBuilder()
    .startObject()
    .startObject("twitter")
    .startObject("_timestamp")
    .field("enabled", true)
    .field("store", true)
    .field("path", "post_date")
    .endObject()
    .endObject()
    .endObject())

The use case of this mappingBuilder object is when we required to perform sorting on the basis of custom defined time stamp ,first of all we need change following attributes of field “_timestamp” like “enabled” = true,”store” = true which is use for store time stamp in your index and “path”=”post_date” that means you set reference for “_timestamp” field which holds “post_date” value as time stamp where “post_date” is field of your document.

Here is the complete addMappingToIndex() method.

def addMappingToIndex(indexName: String, client: Client): CreateIndexResponse = {

    val settingsStr = ImmutableSettings.settingsBuilder().
      put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()
    client.admin().indices().prepareCreate(indexName)
      .setSettings(settingsStr)
      .addMapping(indexName, mappingBuilder).execute()
      .actionGet()

  }

Elasticsearch is schemaless. We can index any json to it. We have a bulk json file, each line is a json. For our implementation: Application reads file line by line and index json to the elasticsearch. For this i have created insertBulkDocument() method which uses bulk api for insert set of documents in elastic search index.

Here is the bulk json file. Each line is a json.

{ "id": 1, "source": "twitter", "data": "tweet 1" , "post_date": "2015-05-12"}
{ "id": 2, "source": "twitter", "data": "tweet 2" , "post_date": "2015-05-13"}
{ "id": 3, "source": "twitter", "data": "tweet 3" , "post_date": "2015-05-14"}
{ "id": 4, "source": "twitter", "data": "tweet 4" , "post_date": "2015-05-15"}
{ "id": 5, "source": "twitter", "data": "tweet 5" , "post_date": "2015-05-16"}
{ "id": 6, "source": "twitter", "data": "tweet 6" , "post_date": "2015-05-17"}
{ "id": 7, "source": "twitter", "data": "tweet 7" , "post_date": "2015-05-18"}
{ "id": 8, "source": "twitter", "data": "tweet 8" , "post_date": "2015-05-19"}
{ "id": 9, "source": "twitter", "data": "tweet 9" , "post_date": "2015-05-20"}
{ "id": 10, "source": "twitter", "data": "tweet 10" , "post_date": "2015-05-21"}

Here is the complete insertBulkDocument() method.

def insertBulkDocument(client: Client): BulkResponse = {
    val bulkJson = Source.fromFile("src/main/resources/bulk.json").getLines().toList
    val bulkRequest = client.prepareBulk()
    for (i <- 0 until bulkJson.size) {
      bulkRequest.add(client.prepareIndex("twitter", "tweet", (i + 1).toString).setSource(bulkJson(i)))
    }
    bulkRequest.execute().actionGet()
  }

After this we can perform sorting and searching by time stamp,for this i have created method sortByTimeStamp().In elastic search each documents by default ascending oder in terms of time stamp.In this method we are using QueryBuilders and FilterBuilders api for create filteredQuery.

Here is the complete sortByTimeStamp() method.

 def sortByTimeStamp(client: Client, indexName: String): SearchResponse = {
    val filter = andFilter(rangeFilter("post_date").from("2015-05-13") to ("2015-05-19"))
    val sortedSearchResponse = client.prepareSearch().setIndices(indexName)
      .setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filter))
      .setSize(2).addSort("post_date", SortOrder.DESC).execute().actionGet()
    sortedSearchResponse
  }

We can update document by adding one or more field . we have used update api  for create the UpdateRequest  on client we pass three parameters index name,type name,and id.If update request successfully execute then it will change version of document like if document version is 1 then after call this method version will be 2.

Here is the complete updateIndex() method.

 def updateIndex(client: Client, indexName: String, typeName: String, id: String): UpdateResponse = {
    val updateRequest = new UpdateRequest(indexName, typeName, id)
      .doc(jsonBuilder()
        .startObject()
        .field("gender", "male")
        .endObject())
    client.update(updateRequest).get()
  }

We can delete document by id ,for this i have created method deleteDocumentById() which takes three parameters index name,type name,and id and returns delete response.

Here is the complete deleteDocumentById() method.

  def deleteDocumentById(client: Client, indexName: String, typeName: String, id: String): DeleteResponse = {

    val delResponse = client.prepareDelete("twitter", "tweet", "1")
      .execute()
      .actionGet()
    delResponse
  }

We can delete index from our node,for this i have created method deleteIndex() which takes client and index name as argument and returns acknowledgement in boolean.

Here is the complete deleteIndex() method.

  def deleteIndex(client: Client, indexName: String): Boolean = {

    val deleteIndexRequest = new DeleteIndexRequest(indexName)
    val deleteIndexResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
    deleteIndexResponse.isAcknowledged()
  }

Here is the complete application.

 package com.narayan

import org.elasticsearch.node.NodeBuilder._
import org.elasticsearch.client.Client._
import org.elasticsearch.node.Node._
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import scala.io.Source
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentFactory._
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.query.FilterBuilders._
import org.elasticsearch.action.search.SearchType
import org.elasticsearch.client.Client
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.update.UpdateResponse
import org.elasticsearch.search.sort.SortOrder
import org.elasticsearch.action.search.SearchResponse

/**
 * @author narayan
 *
 */
trait ESOperation {

  /**
   * This is getClient method which returns java API client
   *
   * @return
   */
  def getClient(): Client = {
    val node = nodeBuilder().local(true).node()
    val client = node.client()
    client
  }

  val mappingBuilder = (jsonBuilder()
    .startObject()
    .startObject("twitter")
    .startObject("_timestamp")
    .field("enabled", true)
    .field("store", true)
    .field("path", "post_date")
    .endObject()
    .endObject()
    .endObject())

  /**
   * This is addMappingToIndex method which provides settings and mappings to index and  create it
   *
   * @param indexName
   * @param client
   * @return
   */
  def addMappingToIndex(indexName: String, client: Client): CreateIndexResponse = {

    val settingsStr = ImmutableSettings.settingsBuilder().
      put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()
    client.admin().indices().prepareCreate(indexName)
      .setSettings(settingsStr)
      .addMapping(indexName, mappingBuilder).execute()
      .actionGet()

  }

  /**
   * This is insertBulkDocument method which takes each document from file and insert into index
   *
   * @param client
   * @return
   */
  def insertBulkDocument(client: Client): BulkResponse = {
    val bulkJson = Source.fromFile("src/main/resources/bulk.json").getLines().toList
    val bulkRequest = client.prepareBulk()
    for (i <- 0 until bulkJson.size) {
      bulkRequest.add(client.prepareIndex("twitter", "tweet", (i + 1).toString).setSource(bulkJson(i)))
    }
    bulkRequest.execute().actionGet()
  }

  /**
   * This is update index method which updates particular document by add one more field
   *
   * @param client
   * @param indexName
   * @param typeName
   * @param id
   * @return
   */
  def updateIndex(client: Client, indexName: String, typeName: String, id: String): UpdateResponse = {

    val updateRequest = new UpdateRequest(indexName, typeName, id)
      .doc(jsonBuilder()
        .startObject()
        .field("gender", "male")
        .endObject())
    client.update(updateRequest).get()
  }

  /**
   * This is sortByTimeStamp method provides sorted document on the basis of time stamp
   *
   * @param client
   * @param indexName
   * @return
   */
  def sortByTimeStamp(client: Client, indexName: String): SearchResponse = {
    val filter = andFilter(rangeFilter("post_date").from("2015-05-13") to ("2015-05-19"))
    val sortedSearchResponse = client.prepareSearch().setIndices(indexName)
      .setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filter))
      .setSize(2).addSort("post_date", SortOrder.DESC).execute().actionGet()
    sortedSearchResponse
  }

  /**
   * This is deleteDocumentById method which removes particular document from index
   *
   * @param client
   * @param indexName
   * @param typeName
   * @param id
   * @return
   */
  def deleteDocumentById(client: Client, indexName: String, typeName: String, id: String): DeleteResponse = {

    val delResponse = client.prepareDelete("twitter", "tweet", "1")
      .execute()
      .actionGet()
    delResponse
  }

  /**
   * This is deleteIndex method which takes client and index as parameter and delete index from node
   *
   * @param client
   * @param indexName
   * @return
   */
  def deleteIndex(client: Client, indexName: String): Boolean = {

    val deleteIndexRequest = new DeleteIndexRequest(indexName)
    val deleteIndexResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
    deleteIndexResponse.isAcknowledged()
  }

}

We can run this application by extending ESOperation trait in our main object like.

Here is the main object.

  package com.narayan

object CrudOnElasticSearch extends ESOperation with App {

  val client = getClient()
  val mappingResponse = addMappingToIndex("twitter", client)
  println("@@@@@ mapping response is " + mappingResponse.isAcknowledged())
  val bulkInsertResponse = insertBulkDocument(client)
  println("@@@@@@ number of documents inserted by bulk request  is " + bulkInsertResponse.getItems.length)
  Thread.sleep(800)
  val sortedResult = sortByTimeStamp(client, "twitter")
  println("@@@@@@ sorted by time stamp and returns number filtered document " + sortedResult.getHits.getHits.length)
  val updateResponse = updateIndex(client, "twitter", "tweet", "1")
  println("@@@@@@@@ update response document version is " + updateResponse.getVersion)
  val deleteDocument = deleteDocumentById(client, "twitter", "tweet", "1")
  println("@@@@@ deleted document by id is " + deleteDocument.isFound())
  val deleteIndexResponse = deleteIndex(client, "twitter")
  println("@@@@@ delete index response " + deleteIndexResponse)
}

After this go to sbt console and type  => sbt run we will get expected output on console.

Or we can check data using curl request.Before  creating this request we require to comment the deleteIndex() call because deleteIndex() method delete index from current node so we get index misssing exception.

curl -XGET http://localhost:9200/twitter/tweet/_search?pretty

And here is the output of the curl request.

{
“took” : 8,
“timed_out” : false,
“_shards” : {
“total” : 5,
“successful” : 5,
“failed” : 0
},
“hits” : {
“total” : 9,
“max_score” : 1.0,
“hits” : [ {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “4”,
“_score” : 1.0,
“_source”:{ “id”: 4, “source”: “twitter”, “data”: “tweet 4” , “post_date”: “2015-05-15”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “9”,
“_score” : 1.0,
“_source”:{ “id”: 9, “source”: “twitter”, “data”: “tweet 9” , “post_date”: “2015-05-20”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “5”,
“_score” : 1.0,
“_source”:{ “id”: 5, “source”: “twitter”, “data”: “tweet 5” , “post_date”: “2015-05-16”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “6”,
“_score” : 1.0,
“_source”:{ “id”: 6, “source”: “twitter”, “data”: “tweet 6” , “post_date”: “2015-05-17”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “2”,
“_score” : 1.0,
“_source”:{ “id”: 2, “source”: “twitter”, “data”: “tweet 2” , “post_date”: “2015-05-13”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “7”,
“_score” : 1.0,
“_source”:{ “id”: 7, “source”: “twitter”, “data”: “tweet 7” , “post_date”: “2015-05-18”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “3”,
“_score” : 1.0,
“_source”:{ “id”: 3, “source”: “twitter”, “data”: “tweet 3” , “post_date”: “2015-05-14”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “8”,
“_score” : 1.0,
“_source”:{ “id”: 8, “source”: “twitter”, “data”: “tweet 8” , “post_date”: “2015-05-19”}
}, {
“_index” : “twitter”,
“_type” : “tweet”,
“_id” : “10”,
“_score” : 1.0,
“_source”:{ “id”: 10, “source”: “twitter”, “data”: “tweet 10” , “post_date”: “2015-05-21”}
} ]
}
}

You can explore more on the api here.

Download the source code to check the functionality. GitHub

Advertisements

About Narayan Kumar

Software Consultant at Knoldus Software LLP.
This entry was posted in Scala. Bookmark the permalink.

4 Responses to Elasticsearch: CURD Operations and sorting documents by time stamp with scala using java api of elastic search

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s