ElasticSearch: How to index data in bulk in elasticsearch with scala using java Bulk API.


Elasticsearch is an open-source, restful, distributed, search engine built on top of apache-lucene.In this post, we will learn to use elasticsearch java api in Scala to index data using BulkRequest.

we will begin with adding dependency of elasticsearch in the project. At the time 0.19.8 was the latest. The artifact is available on typesafe repository. Here is the snippet in build.sbt file.

name := “ElasticSearchBulkproject”

version := “0.1.0”

scalaVersion := “2.9.2”

resolvers += “Typesafe Repo” at “http://repo.typesafe.com/typesafe/releases/”

libraryDependencies += “org.elasticsearch” % “elasticsearch” % “0.19.8”

Elasticsearch is schemaless. We can index any json to it. We have a bulk json file, each line is a json. For our implementation: Application reads file line by line and add json to bulkRequest

Here is the bulk json which we need to index. Every line represents a json.
{ “id”: 1, “source”: “wordpress”, “data”: “document 1” }
{ “id”: 2, “source”: “wordpress”, “data”: “document 2” }
{ “id”: 3, “source”: “wordpress”, “data”: “document 3” }
{ “id”: 4, “source”: “wordpress”, “data”: “document 4” }
{ “id”: 5, “source”: “wordpress”, “data”: “document 5” }
{ “id”: 6, “source”: “wordpress”, “data”: “document 6” }

Firstly We need to create node in scala using java api which will interact with elasticsearch server running on our machine.

val node = nodeBuilder().client(true).node()

We then create a client from node created.

val client = node.client()

After this we will create an indexRequest to create an index named “wordpress” using createIndexRequest.

val indexRequest = new CreateIndexRequest("wordpress")

if we have elasticsearch servers running on other nodes, then we can also specify number of shards and replicas parameters using ImmutableSettings.settingsBuilder

ImmutableSettings.settingsBuilder().put("index.number_of_shards", 13).put("index.number_of_replicas", 1).build()

here is the code for creating index with specified properties.

indexRequest.settings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards",13).
put("index.number_of_replics", 1).build())
client.admin().indices().create(indexRequest).actionGet()

Now we will create a bulkRequest.


val bulkRequest = client.prepareBulk()

in add method of bulkRequest we can pass prepareIndex method call as parameter and setSource as json as below.


 bulkRequest.add(client.prepareIndex("wordpress", "wordpressStream", i.toString).setSource(bulkJson(i)))

in the prepareIndex call on client we pass three parameters

1) name of the index: “wordpress” in our case

2) document type: “wordpressStream” in our case

3) unique id: an unique id for each document to be indexed By using this unique id we will be able to search for particular document residing on particular index.

Now start elasticsearch by executing following command.

elasticsearch -f

combning all the peices together we have following code .

package knoldus.com.elasticsearch

import org.elasticsearch.node.NodeBuilder.nodeBuilder
import scala.io.Source
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.client.Client

object ElasticSearchBulkRequestExample extends App {
  
  val node = nodeBuilder().client(true).node()
  val client = node.client()

  createIndexWithProperties("wordpress")

  // we are creating a list of json from bulk.json file by using scala io
  val bulkJson = Source.fromFile("src/main/resources/bulk.json").getLines().toList

  val bulkRequest = client.prepareBulk()

  // in for loop we are adding  prepareindex call to bulkRequest for each json
  for (i <- 1 to bulkJson.size) {
   bulkRequest.add(client.prepareIndex("wordpress", "wordpressStream", i.toString).setSource(bulkJson(i)))
  }

  // execute bulkRequest
  bulkRequest.execute().actionGet()
  client.close()
// this method create index with specified properties such as number_of_shards and number_of_replicas
  
  def createIndexWithProperties(indexName: String) = {
   val indexRequest = new CreateIndexRequest(indexName)
   indexRequest.settings(ImmutableSettings.settingsBuilder().
   put("index.number_of_shards", 13).put("index.number_of_replicas", 1).build())
   client.admin().indices().create(indexRequest).actionGet()
  }
}

Now we can check data using curl request.

curl -XGET http://localhost:9200/wordpress/wordpressStream/1

here “http://localhost:9200 is url:port of elasticsearch. “wordpress” is name of index and “wordpressStream” is document type and last one is the id.

And here is the output of the curl request.

{“_index”:”wordpress”,”_type”:”wordpressStream”,”_id”:”1″,”_version”:3,”exists”:true, “_source” : { “id”: 1, “source”: “wordpress”, “data”: “document 1” }}

About Piyush Mishra

Software Consutant
This entry was posted in Web. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s