Concept of UDF in Spark: User-Defined Function

man in white shirt using macbook pro
Reading Time: 3 minutes

As we all know, Spark contains a whole variety of inbuilt functions through which you can do any sort of transformation in your data frame and achieve your desired output, but sometimes you may find that you don’t require them. Then What?

In that case, you can define your own function, known as UDFs (User Defined Functions) which makes it possible to write your own transformation using Python or Scala, or even you can use external libraries.

Let’s see how we can create them

How to create UDF?

Creating UDF is relatively very easy, you can consider a UDF as a function that the user defines to operate on data on spark data frames or datasets. It allows users to define their own customs and apply them to their data to perform complex operations not available in Spark’s built-in functions.

User Defined Functions can be brought to reality via different programming languages such as Java, Python, R, and my personal favorite Scala. UDF can take any number of arguments and can return a single value, one more advantage of them is that they can be registered with Spark so that they can be used in SQL operations, Dataframe ops, and Datasets ops.

Visualizing UDF in Spark

In spark UDF can be defined in different languages as follows:

Scala:

import spark.implicits._

import org.apache.spark.sql.functions.{col, udf}


val aMultiplier = udf((x: Int, y: Int) => x * y)

val randomNumbers = Seq((2, 4), (8, 10))

val columns = Seq("x", "y")


val easyDF = spark.createDataFrame(randomNumbers).toDF(columns:_*)

val dfWithU = easyDF.withColumn("z", aMultiplier(col("x"), col("y")))

dfWithU.show()

Python:

from pyspark.sql.functions import udf

from pyspark.sql.types import IntegerType


def aMultiplier(x, y):

    return x * y


multiply_udf = udf(aMultiplier, IntegerType())


df = spark.createDataFrame([(2, 5), (6, 10)], ["num1", "num2"])

df = df.withColumn("output", multiply_udf(df.num1, df.num2))

df.show()

Output ->

num1num2output
2510
61060
the output of the above code

In the above examples, we have defined a UDF called ‘aMultiplier’ which takes two parameters as input and returns their product. Then we register this UDF with Spark using the udf() or spark.udf().register() method, depending on the programming language. Finally, we use the UDF to create a new column in a data frame or Dataset.

Moving ahead let’s discuss the types of UDFs we have.

Types of UDFs

Basically, there are three types of UDFs available in our beloved Apache Spark:

  • Scalar UDFs:
    UDFs that take one or more input columns and return a single output column are known as Scala UDFs. Simple & Elegant
    Example:
import spark.implicits._

import org.apache.spark.sql.functions.{col, udf}


val cubicUDF = udf((num: Long) => num * num * num)

val scalarDf = spark.range(1, 10).select(cubicUDF(col("id")) as "cubic_IDs")

scalarDf.show()

Output ->

  • Vector UDFs:
    A Vector in Spark can be defined as a dense or sparse vector of doubles that is used to represent a feature vector in machine learning applications Vector UDFs allow us to define custom functions that operate on Vector columns in Spark DataFrames.

Performance Association with UDF

Using UDFs can be an expensive process, as they may require data serialization and deserialization. Therefore, it’s important to use UDFs judiciously and only when built-in functions cannot meet your requirements.

Let’s discuss these implications in detail:

  1. Serialization and Deserialization Overhead:
    When using UDFs, the data needs to be serialized and deserialized between the JVM and the user-defined function this leads to significant performance overhead.
  2. Garbage Collection:
    While using UDFs they can create temporary objects that can accumulate in the JVM heap, leading to garbage collection overhead.
  3. Resource Utilization:
    UDFs can easily chew up loads of your resources, such as CPU and memory. So it’s necessary you carefully tune the Spark configuration parameters, such as spark.driver.memory, spark.executor.memory, spark.executor.cores, and spark.executor.instances.
  4. Data Skew:
    UDFs can cause data skew, where some partitions have significantly more data than others. This can result in performance degradation and resource contention.

Conclusion

While building your data pipeline UDFs can provide a powerful way to perform complex data processing and analysis. However, you have to use them cleverly and carefully so that they dunno cause any performance issues.
That’s it in this blog thanks for making it to the last of this blog and if you like my style of writing please do check out my other blogs too by clicking here.

Reference

Written by 

Hi community, I am Raviyanshu from Dehradun a tech enthusiastic trying to make something useful with the help of 0 & 1.