Partition-Aware Data Loading in Spark SQL

Table of contents
Reading Time: 3 minutes

Data loading, in Spark SQL, means loading data in memory/cache of Spark worker nodes. For which we use to write following code:

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.table", connectionProperties)

In here we are using jdbc function of DataFrameReader API of Spark SQL to load the data from table into Spark Executor’s memory, no matter how many rows are there in table.

Here is an example of jdbc implementation:

val df = spark.read.jdbc("jdbcUrl", "person", connectionProperties)

In the code above, the data will be loaded into Spark Cluster. But, only one worker will iterate over the table and try to load the whole data in its memory, as only one partition is created, which might work if table contains few hundred thousand records. This fact can be confirmed by following snapshot of Spark UI:

blog-single-stage-new

But, what will happen if the table, that needs to be loaded in Spark cluster, contains 10, 20 or 50 Million rows. In that case loading whole table in one Spark worker node will not be very efficient as it will take both more time and memory.

So, to improve this situation we can load table in Spark Cluster in partition aware manner, i.e., asking each worker node to pick up its own partition of table from database and load it into its memory. For that we need to use same jdbc function of DataFrameReader API but in a different way, something like this:

val df = spark.read.jdbc("jdbcUrl", "person", "personId", 0, 2944089, 4, connectionProperties)

In above code, we have provided personId column, along with min. and max. ids, using which Spark will partition the rows, i.e., Partition 1 will contain rows with IDs from 0 to 736022, Partition 2 will contain rows with IDs from 736023 to 1472045, … and so on. If you want to increase the number of partitions, you can do so but you have to keep in mind that more the number of partitions, more the number of connections will be created by Spark to fetch the data.

Now, when Spark will load the data from person table, then each worker node will fetch only that partition, i.e., the one which is asked by master node to load. Following, is the snapshot of Spark UI which confirms that each node is loading only one partition from database:

blog-multi-stage

Here we can see that person table is partitioned into 4 parts which is being loaded by different executors in parallel.

In this way we can ease our pain of loading large data from RDBMS into Spark cluster and leverage the feature of partition aware data loading into Spark.

If you have any doubts or suggestions then kindly leave a comment !!


KNOLDUS-advt-sticker

Written by 

Himanshu Gupta is a software architect having more than 9 years of experience. He is always keen to learn new technologies. He not only likes programming languages but Data Analytics too. He has sound knowledge of "Machine Learning" and "Pattern Recognition". He believes that best result comes when everyone works as a team. He likes listening to Coding ,music, watch movies, and read science fiction books in his free time.

7 thoughts on “Partition-Aware Data Loading in Spark SQL2 min read

  1. Few questions .
    Glad that you covered this It will clear my doubt . Exploring

    First of all i find the term Partition little bit confusing with data distribution . In Traditional MPP databases Data is distributed across cluster and then data is partitioned in each storage nodes . In Hive also When we talk about partitions the partition is at each node storage not for data distribution across nodes.

    If i understand Spark will allow us to re partition and make another copy of the .same RDD which is saved with a partitioning scheme during data load but later it can go through another Transformation and re-partitioned and a copy of RDD with new partition can be saved /cached.

    Will discuss further if i get response . In Advance Thanks .

    1. Hi Sasanka,

      Thanks for providing feedback. I agree with your point that in Traditional MPP databases, data is distributed across nodes and each storage node has its own part of data. Like Hive, HDFS, etc.

      But Postgres is not a distributed database. It keeps its data on a single node. Due to which, when we pull data from it in to Spark, it gets pulled in a single partition. Of course, once data is loaded into cache, we can distribute it across cluster via re-partitioning. But, until then it is loaded in to cache in a single partition. Which will create problem if the amount of data in Postgres is too large (just like explained in above). To overcome this issue, we need to specify partitions on our own (again like the way mentioned in the post).

      I hope this answers your question, Sasanka. If you have any further query then please do comment.

      Regards

  2. Hi Himanshu ,
    Thanks for reply .
    My question was not regarding extraction/source DB but during data ingestion to spark . From your answer i think i got it what wanted so will put a follow up question but before that repeating .i feel data distribution and data partition are 2 seperate things ideally. Data distribution is the process through which one can distribute data among nodes in a cluster i.e. it is coarse level. Data partition is how data will again be arranged in compartments using partition scheme in each Node i.e. it is more granular.

    My follow up question is
    ——————————
    Then can i say like Terdata,Netezza we can co-locate data for joins?
    The above question is arising because Snappydata mentions that it is X times faster than Spark and the most important point is unlike spark in most cases it need not to shuffle data as it stores data in collocated way . But if i understand from your answer Spark also can co-locate data using “Partitioning scheme”.

    Regards
    sasanka ghosh

    1. I think I got your question wrong first time. But now I understand it. It looks like you are finding an answer to question that “Can we Co-locate Data in Spark ?”, correct me if I am wrong.

      So, the answer to your question is – “YES”, Spark can co-locate data using the approach given above in the post. But, it also depends on the shuffle operations (like, join, distinct, etc.) that we are writing, as it might happen that the shuffle operation might require to shuffle data, based on conditions of shuffle operation.

      1. Hi Himanshu ,
        Thanks . .Yes i was looking for that answer .
        Regards
        sasanka ghosh

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading