In previous blog of this series, we took a glance over the basic definition of Spark and Vertica. We also did a code overview for reading data from Vertica using Spark as DataFrame and saving the data into Kafka. In this blog we will be doing the reverse flow i.e. working on reading the data from Kafka as a DataFrame and writing that DataFrame into Vertica.
Looks pretty simple, right! Exactly as I thought when I was getting started with this. So let’s get right into this.
In Vertica, there are type of storages:
- WOS: write optimized store
- ROS: read optimized store
Usually, when the data is loaded, it first goes into WOS (unless you use the COPY DIRECT statement). Records in WOS are stored without compression or indexing to support faster loading. Because the data in WOS is sorted only when queried, it is not optimized for reading. Then Vertica moves data from WOS to ROS using the Tuple Mover using the following two operations:
The above method is used when you have a load of 100 MB or less (trickle data load). For heavy data load than this there comes a DIRECT COPY command which copies data directly to ROS. In this blog we are considering big data movements hence we’ll be focusing on using DIRECT COPY command.
So let’s move on to connecting Spark with Vertica. With Spark, there are two ways to use the DIRECT COPY command to save data to vertica:
- Direct method: Copying data directly from Spark to Vertica.
- Indirect method: First copy data to HDFS and then copy that data from HDFS to Vertica.
Although it may seem that the first approach “Direct method” is better as it doesn’t have any intermediate dependency and as data is travelling directly to Vertica, hence latency might be less in this approach, but believe me it is not and we’ll be using the Indirect approach to connect Spark with Vertica.
Problems with direct approach:
- This method uses JDBC connector of Spark to save dataFrame into Vertica which doesn’t support Vertica very well in case of task failures from Spark and can introduce duplicate data in Vertica as a result.
- Also, this method is slow in copying data to vertica. (why??)
Why using Indirect approach?
- This method uses a dedicated Spark connector instead of JDBC connector and hence can manage with Spark task failures and atomicity of Vertica.
- In this method Spark first copies data to HDFS as ORC files and then Vertica natively runs its builtin COPY command to fetch data from HDFS which is supposed to be very fast. Hence this method is fast compared to the other one.
So let’s get on with it.
The first thing we need to do is to setup the project. The setup can be referred from the setup section of the previous blog in the same series. Other than that, we will also need HDFS as it will be used as the intermediate storage.
First thing is to create a SparkSession to connect with Spark and Spark-SQL:
val sparkSession = SparkSession.builder() .config(new SparkConf().setAll(config)) .appName(appName) .master(master) .getOrCreate()
Here “appName” would be the name you want to set to your Spark application and “master” would be the master URL for spark. Here we are running Spark in local mode hence
master = "local" //4 is number of cores to be used
For reading data from Kafka we need the following:
def kafkaOptions: Map[String, String] = Map( "kafka.bootstrap.servers" -> brokers, //address of Kafka brokers "group.id" -> groupId, //group id for kafka consumers "startingOffsets" -> "earliest", //starting offsets to start picking data "endingOffsets" -> "latest", //the limit till where to pick the data and create a dataframe "subscribe" -> sourceTopic //the topic from which the data will be consumed ) val kafkaSource: String = "kafka" val dataFrame = sparkSession.read.format(kafkaSource).options(kafkaOptions).load()
The above code will read the data and create the DataFrame. Now we have to start the preparations for writing this DataFrame to Vertica.
To save the data to Vertica, first we need some credentials to connect to database and some other properties:
val verticaProperties: Map[String, String] = Map( "db" -> "db", // Database name "user" -> "user", // Database username "password" -> "password", // Password "table" -> "table", // vertica table name "dbschema" -> "dbschema", // schema of vertica where the table will be residing "host" -> "host", // Host on which vertica is currently running "hdfs_url" -> "hdfs_url", // HDFS directory url in which intermediate orc file will persist before sending it to vertica "web_hdfs_url" -> "web_hdfs_url" // FileSystem interface for HDFS over the Web )
One last thing we need is a data source to provide to Spark for Vertica:
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
Now for the actual writing part, the following code will do the trick:
Here the mode is set as “append” (can be “append” / “overwrite” / “ignore”) to append the new data into existing data. The above code will save the data into Vertica.
One last thing is to stop SparkSession after all the operations:
And yeah this was pretty much it for writing data to Vertica. Easy enough, huh!
In this blog, we reviewed the types data storages in Vertica and how the data flows through them while saving. We also discussed the ways of saving data into Vertica Using Spark and used Spark Vertica connector to save the data into Vertica. The one thing to notice here is that we saved the data in batch mode not in a streaming mode. That’s still due. Hence in out next installment, we’ll be taking a look at how we can do the same thing using Spark Structured Streaming.
For more, you can refer to this repository which implements the same thing in KafkaToVerticaApplication object.
Like, comment, share and stay tuned. 😉