There’s been a lot of time we have been working on streaming data. Using Apache Spark for that can be much convenient. Spark provides two APIs for streaming data one is Spark Streaming which is a separate library provided by Spark. Another one is Structured Streaming which is built upon the Spark-SQL library. We will discuss the trade-offs and differences between these two libraries in another blog. But today we’ll focus on saving streaming data to Elasticseach using Spark Structured Streaming. Elasticsearch added support for Spark Structured Streaming 2.2.0 onwards in version 6.0.0 version of “Elasticsearch For Apache Hadoop” dependency. We will be using these versions or higher to build our sbt-scala project.
First, you need to add “Spark SQL” dependency:
and “Elasticsearch For Apache Hadoop” dependency to your build.sbt
or you can go to maven repository for Elasticsearch For Apache Hadoop and Spark SQL and get a suitable version. Note that version should be at least 6.0.0 for
"Elasticsearch For Apache Hadoop" and 2.2.0 or higher for
Let’s get started with the code
We will be reading a JSON file and saving its data to elasticsearch in this code. But first let’s go through the basic code to read a JSON file as a DataFrame and for that, we need to create a schema for our JSON.
Now we can use this schema to read the JSON file in streaming mode with the following code:
Now with this “streamingDF”, we will send data to the destination. Let’s use the console for verifying the data.
Here .format(“console”) tells the result to print on console. This code prints the JSON data from resource file on the console:
That was just a simple structured streaming code where a JSON file was the source and console was the destination.
Elasticsearch as the destination in streaming
Now to add the elasticsearch as the destination for Spark Structured Streaming, we need to add the configuration of elasticsearch in the object of SparkSession:
We are adding the authentication credentials here with the elasticsearch nodes and port. Currently, the address is for the local machine for testing.
Now with the previous code, we need to add the destination for elasticsearch for DataStreamWriter:
Here we changed the format from console to org.elasticsearch.spark.sql which tells the destination for streaming is now elasticsearch.
Now the code is complete. After running it, this will save the JSON data to elasticsearch. To make sure use curl:
This will show all the documents saved to this index, index-name. The following got saved to Elasticsearch:
You can find the whole code here. In case of any queries please comment below.
I hope that helped.