If you have ever needed to index large records into Elasticsearch, you might have wondered: “What could be the fastest way to do it”? In my blog, I will try to put a scenario where it gets good to better. I believe you are familiar with the basics of Apache Spark and Elasticsearch. It is not a necessity to use Spark if you do not want to but whenever I need to do something fast and in parallel, It’s my favorite tool.
Use Case
I need to pull employee records from Postgres DB and index it into Elasticsearch Indexes as fast as possible.I am using wabisabi which is a lightweight ES REST client. you can include it in the by following the instruction on the link. This is how I created an instance of ElasticSearch client.
The worst case
I started with an Elasticsearch client to index each record one by one:
Although, I get my spark job done without any failure and but I am not getting all of my records into Elasticsearch. Only 2k documents out of 100k were ingested successfully.
Why documents were ingested partially?
The `client.index` method returns Future[Respons] which is not guaranteed to be completed successfully inside a Spark `.foreach` action. Lets put an Await then!!
The reliable, but bad solution
The below code snippet gives a guarantee on the spark job that future is completed or failed within specified time!
Still, It takes a lot of time to be indexed each record one by one.
Using Bulk Indexing in Elastic Style to reduce index time!
Instead of indexing each record individually, we can form a bulk index request of a larger batch (in my case, it worked best with indexing 1000 employees at once!). Below is how I constructed an Elasticserach bulk request to be sent with the bulk request method of the ES client in use:
The Better approach: Using Spark Native Plugin for Indexing
Elasticsearch support plugins for Apache Spark to allow indexing or saving the existing Dataframe or Dataset as elasticsearch index. Here is how.
1. Include elasticsearch-hadoop
as a dependency:
Remember the version might vary according to the version of spark and elasticsearch.
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.5.1",
2. Configure Elasticsearch settings:
es.index.auto.create
If set to “true”, it will create an index not exists already while saving the data to elastic search.
Caution! If you want to provide special settings and mappings for the index being created, you can create it in advance prior to saving any data in elasticsearch. If the option es.index.auto.create
is set to “false”, it will give exception while saving data for a non-existent index.
es.nodes
sets elasticsearch host. The port has defaulted to “9200”es.batch.size.entries
Size (in entries) for batch writes using Elasticsearch bulk API – (0 disables it).
Things to keep in mind while using Bulk API
A detailed sustainable throughput can be achieved by reference the benchmarking page for REST clients. On my local machine, I could see best results when indexing 1000 docs/second. Increasing more than that increases the wait time for the client to get the response. To verify the same, you can use your own HTTP client with Await method.
Saving data to Elasticsearch
Once the configuration is there in spark config, simply provide the format and call the method .save()
on the Dataframe or Dataset we are using. Given the sparkSession as spark
, The below code snippet is reading employee records from the employee table and indexing the records directly into Elasticsearch:
Beware of Duplicates!!
There are options which we’ve used in the code snippet above prior to calling .save()
on the dataset. Let us go by the Options one by one.
1. es.mapping.id
sets the id field of the document being Indexed in the Elasticsearch. It is very important to provide a mapping Id field while saving to an existing Index which has data already and we are performing an update/index operation. If this option is missing, .save()
will populate the document with random Ids which will be duplicated each time an update is performed on the same document. e.g. It will yield like below results for the same document {“id”: 1, “name”: “Manish”}:
2. mode
: Has options “Append” will append on the existing source, “Overwrite” will replace the existing data if any. “ErrorIfExists” will throw an exception and “Ignore” will not save the data at all if there is already data with the index.
With these, I would like to conclude this post. I hope you enojoyed the read. Thanks for reading!!