In two previous blogs, we explored about Vertica and how it can be connected to Apache Spark. The first blog in this mini series was about reading data from Vertica using Spark and saving that data into Kafka. The next blog explained the reverse flow i.e. reading data from Kafka and writing data to Vertica but in a batch mode. i.e reading data from Kafka in a batch and then saving that batch into Vertica. Wouldn’t it be better if we can do the same thing in streaming mode. So today we’ll try to do the same thing with Structured Streaming.
But what is Structured Streaming?? In short words, the Structured Streaming is the streaming of structured data in Spark built on the Spark SQL engine. This is different than the Spark Streaming library. For more information on Structured Streaming you can read the following blogs: Spark Streaming V Structured Streaming, Exploring Structured Streaming.
So let’s get started with it.
First we need to add the required dependencies. For this part, the setup part of the last blog can be read as reference.
First thing is to create a SparkSession to connect with Spark and Spark-SQL:
val sparkSession = SparkSession.builder() .config(new SparkConf().setAll(config)) .appName(appName) .master(master) .getOrCreate()
Here “appName” would be the name you want to set to your Spark application and “master” would be the master URL for spark. Here we are running Spark in local mode hence
master = "local" //4 is number of cores to be used
For reading data from Kafka we need the following:
def kafkaOptions: Map[String, String] = Map( "kafka.bootstrap.servers" -> brokers, //address of Kafka brokers "group.id" -> groupId, //group id for kafka consumers "startingOffsets" -> "earliest", //starting offsets to start picking data "subscribe" -> sourceTopic //the topic from which the data will be consumed ) val kafkaSource: String = "kafka"
Now we need the code which will consume the data from Kafka in streaming mode:
val dataFrame = sparkSession.readStream.format(kafkaSource).options(kafkaOptions).load()
The above code is to read the data and create the streaming DataFrame. Now we have to start the preparations for writing this DataFrame to Vertica.
To save the data to Vertica, first we need some credentials to connect to database and some other properties:
val verticaProperties: Map[String, String] = Map( "db" -> "db", // Database name "user" -> "user", // Database username "password" -> "password", // Password "table" -> "table", // vertica table name "dbschema" -> "dbschema", // schema of vertica where the table will be residing "host" -> "host", // Host on which vertica is currently running "hdfs_url" -> "hdfs_url", // HDFS directory url in which intermediate orc file will persist before sending it to vertica "web_hdfs_url" -> "web_hdfs_url" // FileSystem interface for HDFS over the Web )
One last thing we need is a data source to provide to Spark for Vertica:
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
Now comes the tricky part. Although Structured streaming supports multiple data sources to read and write data but sadly Vertica is not one of them. Vertica doesn’t provide a source which can be used to write a streaming dataFrame to Vertica. But luckily, Spark 2.4.x onwards, we get a method called foreachBatch which provides us a batched dataFrame result of each trigger. This batched dataFrame then can be saved to Vertica using the batch mode. Hence for the writing part, the following code will do the trick:
val saveToVertica: DataFrame => Unit = dataFrame => dataFrame.write.format(verticaDataSource).options(verticaProperties).mode("append").save() val streamingQuery = dataFrame.writeStream.outputMode(OutputMode.Append) .option("checkpointLocation", checkpointLocation) .foreachBatch((ds, _) => saveToVertica(ds)).start()
Here the mode is set as “append” (can be “append” / “complete” / “update”) to append the new data into existing data. Another thing to notice is the option “checkpointLocation”. This option sets the checkpoints which tells us in case of failures or restarts from where to pick the data. The above code will create a streaming query which will save the data from each micro batch of the stream into Vertica.
Now as this is a streaming job which means it should continue running. For that:
streamingQuery.awaitTermincation() //will wait for the external termination call
One last thing is to stop SparkSession after all the operations:
So this was it for using Structured Streaming for writing data into Vertica and with this the mini series for “Using Vertica with Spark Kafka” comes to an end. If you want another blog or more information on these topics please let me know in the comments.
Please like, share and subscribe 😉