Spark: Streaming Datasets

Reading Time: 3 minutes

Spark providing us a high-level API – Dataset, which makes it easy to get type safety and securely perform manipulation in a distributed and a local environment without code changes. Also, spark structured streaming, a high-level API for stream processing allows us to stream a particular Dataset which is nothing but a type-safe structured streams. In this blog, we will see how we can create a type-safe structured streams using spark.

To create a type-safe structured stream first we need to read a Dataset. So, we will read a Dataset from socket basically, from a NetCat utility. We will paste some JSON data in the NetCat program to create Streaming Dataset. Let’s First create an entry point to our structured Dataset i.e, spark session.

val spark = SparkSession.builder()
    .appName("Streaming Datasets")
    .master("local[2]")
    .getOrCreate()

Create Streaming Dataset

Streaming Datasets can be created through the DataStreamReader interface returned by SparkSession.readStream().

// include encoders for DF -> DS transformations
  import spark.implicits._
def readCars(): Dataset[Car] = {
    spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 12345)
      .load() // DF with single string column "value"
      .select(from_json(col("value"), carsSchema).as("car")) // composite column (struct)
      .selectExpr("car.*") // DF with multiple columns
      .as[Car] // encoder can be passed implicitly with spark.implicits
  }

In the above code snippet, we have a method readCars() in which we are reading the data using spark.readStream, that is pasted into the NetCat program. Use the below command to run a NetCat program:

nc -lk 12345

Then paste some JSON data into the NetCat program. For example:

{"Name":"chevrolet impala", "Miles_per_Gallon":14, "Cylinders":8, "Displacement":454, "Horsepower":220, "Weight_in_lbs":4354, "Acceleration":9, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"plymouth fury iii", "Miles_per_Gallon":14, "Cylinders":8, "Displacement":440, "Horsepower":215, "Weight_in_lbs":4312, "Acceleration":8.5, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"pontiac catalina", "Miles_per_Gallon":14, "Cylinders":8, "Displacement":455, "Horsepower":225, "Weight_in_lbs":4425, "Acceleration":10, "Year":"1970-01-01", "Origin":"USA"}

After reading the above JSON data using spark.readStream, we will get a DataFrame with a single string column “value”. From this DataFrame, create a composite column by passing the Schema for the JSON data.

val carsSchema = StructType(Array(
    StructField("Name", StringType),
    StructField("Miles_per_Gallon", DoubleType),
    StructField("Cylinders", LongType),
    StructField("Displacement", DoubleType),
    StructField("Horsepower", LongType),
    StructField("Weight_in_lbs", LongType),
    StructField("Acceleration", DoubleType),
    StructField("Year", StringType),
    StructField("Origin", StringType)
  ))

And Finally, converted the DataFrame into a Dataset with as[Car] function in which we need to pass a type argument. In our case, we pass a type Car which has the same structure that the above schema is compatible with.

case class Car(
                Name: String,
                Miles_per_Gallon: Option[Double],
                Cylinders: Option[Long],
                Displacement: Option[Double],
                Horsepower: Option[Long],
                Weight_in_lbs: Option[Long],
                Acceleration: Option[Double],
                Year: String,
                Origin: String
              )

When we want to convert a DataFrame to a Dataset we need an Encoder. An encoder is a Data structure which tells spark how a row should be converted to a JVM object of Car Type. So, to do that either we can pass Encoder explicitly as[Car] (carEncoder) or let the compiler fetch implicit one for us. And finally readCars() method will give a Dataset of type Car.

Now, let’s do some transformation with the above created Dataset. Let’s show the car names from the Dataset with maintaining type info.

def showCarNames() = {
    val carsDS: Dataset[Car] = readCars()

    // collection transformations maintain type info
    val carNames: Dataset[String] = carsDS.map(_.Name)

    carNames.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

You can find complete code here

Happy blogging!!