Partition-Aware Data Loading in Spark SQL


Data loading, in Spark SQL, means loading data in memory/cache of Spark worker nodes. For which we use to write following code:

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.table", connectionProperties)

In here we are using jdbc function of DataFrameReader API of Spark SQL to load the data from table into Spark Executor’s memory, no matter how many rows are there in table.

Here is an example of jdbc implementation:

val df = spark.read.jdbc("jdbcUrl", "person", connectionProperties)

In the code above, the data will be loaded into Spark Cluster. But, only one worker will iterate over the table and try to load the whole data in its memory, as only one partition is created, which might work if table contains few hundred thousand records. This fact can be confirmed by following snapshot of Spark UI:

blog-single-stage-new

But, what will happen if the table, that needs to be loaded in Spark cluster, contains 10, 20 or 50 Million rows. In that case loading whole table in one Spark worker node will not be very efficient as it will take both more time and memory.

So, to improve this situation we can load table in Spark Cluster in partition aware manner, i.e., asking each worker node to pick up its own partition of table from database and load it into its memory. For that we need to use same jdbc function of DataFrameReader API but in a different way, something like this:

val df = spark.read.jdbc("jdbcUrl", "person", "personId", 0, 2944089, 4, connectionProperties)

In above code, we have provided personId column, along with min. and max. ids, using which Spark will partition the rows, i.e., Partition 1 will contain rows with IDs from 0 to 736022, Partition 2 will contain rows with IDs from 736023 to 1472045, … and so on. If you want to increase the number of partitions, you can do so but you have to keep in mind that more the number of partitions, more the number of connections will be created by Spark to fetch the data.

Now, when Spark will load the data from person table, then each worker node will fetch only that partition, i.e., the one which is asked by master node to load. Following, is the snapshot of Spark UI which confirms that each node is loading only one partition from database:

blog-multi-stage

Here we can see that person table is partitioned into 4 parts which is being loaded by different executors in parallel.

In this way we can ease our pain of loading large data from RDBMS into Spark cluster and leverage the feature of partition aware data loading into Spark.

If you have any doubts or suggestions then kindly leave a comment !!


KNOLDUS-advt-sticker

This entry was posted in Scala, Spark and tagged , , , . Bookmark the permalink.

2 Responses to Partition-Aware Data Loading in Spark SQL

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s