We live in a world of Big data where the size of data is so big even for small results. This is the result of an increase in data collection on a rapid scale in the modern world. This massiveness of data brings the requirements of such tools which can work upon such a big chunk of data. I am pretty sure that you guys are already aware of Apache Spark which is a most common tool nowadays to process big data in batch as well as streaming computations. But in case you need to know about it there several blogs available which can give you a good idea about spark such as Spark DataSets, RDD vs DataFrames, Spark Unconstructed.
Vertica is also a tool which is really helpful in working with big data. In one line
Vertica is a columnar storage platform designed to handle large volumes of data, which enables very fast query performance in traditionally intensive scenarios.
Now, what do we mean by Vertica as columnar storage? This means that Vertica stores data in a column format so it can be queried for best performance. In this way, Vertica reads only the columns needed to answer the query which reduces disk I/O and makes it ideal for read-intensive workloads. Following are some of the features provided by Vertica:
- Column-oriented storage organization
- Standard SQL interface with many analytics capabilities built-in
- Compression to reduce storage costs
- Support for standard programming interfaces
- High performance and parallel data transfer
- Ability to store machine learning models and use them for database scoring
This blog will be focusing on reading the data from Vertica using Spark and dumping it into Kafka. So let’s get started.
First, add the following dependency for Spark SQL and Spark-SQL-Kafka into your build.sbt:
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.3", "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.3" )
Also, version 2.4.x onwards, Spark supports Scala 2.12.x. Hence the Scala version can be set 2.12.x in this project.
To support the Vertica, we need the two following jars:
- vertica-jdbc driver jar
- vertica-spark connector
These jars aren’t available on Maven hence we have to manually add these jars in our SBT project. Luckily, Vertica includes these jars into the package we just installed in the following paths:
Spark Connector path:
/opt/vertica/packages/SparkConnector/libVerica JDBC client library path:
Now for different Spark versions, there are different connectors and here we can choose which connector we have to use. As we are using the latest version of Spark hence we’ll choose the latest version of the connector.
After deciding which connector to use copy the jars to the project-root/lib/ folder and add the following line to your build.sbt to add unmanaged jars to classpath:
unmanagedJars in Compile ++= Seq( baseDirectory.value / "lib/.jar", baseDirectory.value / "lib/.jar")
That’s it. All required dependencies are there and we are now ready to start coding.
Where’s the code?
To read the data from Vertica, first, we have to provide some properties and credentials to access the Vertica. It’ll need the following properties:
val properties: Map[String, String] = Map( "db" -> "db", // Database name "user" -> "user", // Database username "password" -> "password", // Password "table" -> "source.table", // vertica table name "dbschema" -> "source.dbschema", // schema of vertica where the table will be residing "host" -> "host", // Host on which vertica is currently running "numPartitions" -> "source.numPartitions" // Num of partitions to be created in resulting DataFrame )
Next thing is to create a SparkSession to connect with Spark and Spark-SQL:
val sparkSession = SparkSession.builder() .config(new SparkConf().setAll(config)) .appName(appName) .master(master) .getOrCreate()
Here “appName” would be the name you want to set to your Spark application and “master” would be the master URL for spark. Here we are running Spark in local mode hence
master = "local"
One last thing we need is a data source to provide to Spark for Vertica:
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
Now, we can read the data as DataFrame from Vertica through the following code:
val verticaDF = sparkSession.read.format(verticaDataSource).options(properties).load()
To verify the data we can use the “show” method of DataFrame here which will print the DataFrame on the console:
val numRows: Int = verticaDF.show(numRows)
Hence reading data is complete. The next step is to save this data into Kafka.
To save data into Kafka we again need to set up some properties for Kafka:
val kafkaSinkProperties = Map( "kafka.bootstrap.servers" -> "brokers-host:brokers-port", //Host and port of Kafka broker "topic" -> "sinkTopic" //Topic name which needs to be populated with the data )
We need the source for Kafka as well to save data into it.
val kafkaDataSource = "kafka"
Now, all we need to do is to save the DataFrame which we created from Vertica. The following code saved the DataFrame into Kafka:
Here the mode is set as “append” (can be “append” / “overwrite” / “ignore”) to append the new data into existing data. The above code will save the data into Kafka.
One last thing is to stop SparkSession after all the operations are done:
And with this, we are done. Yup, that was all to it.
So in this blog, we got to know a bit more about Vertica and the connectivity of Spark and Vertica. We fetched the data from Vertica and saved the data into Kafka. In future installments, we will do the reverse i.e reading from Kafka and writing data into Vertica. We will also try out some Structured Streaming with some compatible use cases.
Like, comment, share and stay tuned. 😉