Data loading, in Spark SQL, means loading data in memory/cache of Spark worker nodes. For which we use to write following code:
val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.table", connectionProperties)
In here we are using
jdbc function of
DataFrameReader API of Spark SQL to load the data from
table into Spark Executor’s memory, no matter how many rows are there in
Here is an example of
val df = spark.read.jdbc("jdbcUrl", "person", connectionProperties)
In the code above, the data will be loaded into Spark Cluster. But, only one worker will iterate over the table and try to load the whole data in its memory, as only one partition is created, which might work if table contains few hundred thousand records. This fact can be confirmed by following snapshot of Spark UI:
But, what will happen if the table, that needs to be loaded in Spark cluster, contains 10, 20 or 50 Million rows. In that case loading whole table in one Spark worker node will not be very efficient as it will take both more time and memory.
So, to improve this situation we can load table in Spark Cluster in partition aware manner, i.e., asking each worker node to pick up its own partition of table from database and load it into its memory. For that we need to use same
jdbc function of
DataFrameReader API but in a different way, something like this:
val df = spark.read.jdbc("jdbcUrl", "person", "personId", 0, 2944089, 4, connectionProperties)
In above code, we have provided
personId column, along with min. and max. ids, using which Spark will partition the rows, i.e.,
Partition 1 will contain rows with IDs from 0 to 736022,
Partition 2 will contain rows with IDs from 736023 to 1472045, … and so on. If you want to increase the number of partitions, you can do so but you have to keep in mind that more the number of partitions, more the number of connections will be created by Spark to fetch the data.
Now, when Spark will load the data from
person table, then each worker node will fetch only that partition, i.e., the one which is asked by master node to load. Following, is the snapshot of Spark UI which confirms that each node is loading only one partition from database:
Here we can see that
person table is partitioned into 4 parts which is being loaded by different executors in parallel.
In this way we can ease our pain of loading large data from RDBMS into Spark cluster and leverage the feature of partition aware data loading into Spark.
If you have any doubts or suggestions then kindly leave a comment !!