Spark Structured Streaming with Elasticsearch

There’s been a lot of time we have been working on streaming data. Using Apache Spark for that can be much convenient. Spark provides two APIs for streaming data one is Spark Streaming which is a separate library provided by Spark. Another one is Structured Streaming which is built upon the Spark-SQL library. We will discuss the trade-offs and differences between these two libraries in another blog. But today we’ll focus on saving streaming data to Elasticseach using Spark Structured Streaming. Elasticsearch added support for Spark Structured Streaming 2.2.0 onwards in version 6.0.0 version of “Elasticsearch For Apache Hadoop” dependency. We will be using these versions or higher to build our sbt-scala project.

Pre-requisites

First, you need to add “Spark SQL” dependency:

and  “Elasticsearch For Apache Hadoop” dependency to your build.sbt

or you can go to maven repository for Elasticsearch For Apache Hadoop and Spark SQL and get a suitable version. Note that version should be at least 6.0.0 for "Elasticsearch For Apache Hadoop" and 2.2.0 or higher for "Spark-SQL".

Let’s get started with the code

We will be reading a JSON file and saving its data to elasticsearch in this code. But first let’s go through the basic code to read a JSON file as a DataFrame and for that, we need to create a schema for our JSON.

Now we can use this schema to read the JSON file in streaming mode with the following code:

Now with this “streamingDF”, we will send data to the destination. Let’s use the console for verifying the data.

Here .format(“console”) tells the result to print on console. This code prints the JSON data from resource file on the console:

That was just a simple structured streaming code where a JSON file was the source and console was the destination.

Elasticsearch as the destination in streaming

Now to add the elasticsearch as the destination for Spark Structured Streaming, we need to add the configuration of elasticsearch in the object of SparkSession:

We are adding the authentication credentials here with the elasticsearch nodes and port. Currently, the address is for the local machine for testing.

Now with the previous code, we need to add the destination for elasticsearch for DataStreamWriter:

Here we changed the format from console to org.elasticsearch.spark.sql which tells the destination for streaming is now elasticsearch.

Now the code is complete. After running it, this will save the JSON data to elasticsearch. To make sure use curl:

curl http://localhost:9200/index-name/_search

This will show all the documents saved to this index, index-name. The following got saved to Elasticsearch:

Screenshot from 2018-09-23 19-42-52.png

You can find the whole code here. In case of any queries please comment below.

I hope that helped.
Thanks 🙂

References:

  • https://discuss.elastic.co/t/spark-structured-streaming-sink-in-append-mode/105664


knoldus-advt-sticker

Written by 

Anuj Saxena is a software consultant having more than 1.5 years of experience. Anuj has worked on functional programming languages like Scala and functional Java and is also familiar with other programming languages such as Java, C, C++, HTML. He is currently working on reactive technologies like Spark, Kafka, Akka, Lagom, Cassandra and also used DevOps tools like DC/OS and Mesos for Deployments. His hobbies include watching movies, anime and he also loves travelling a lot.

Leave a Reply

%d bloggers like this: