Spark Stream-Stream Join

Spark Stream-Stream Join
Table of contents
Reading Time: 2 minutes

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to learn about Spark Stream-Stream Join and see how beautifully spark now give support for joining the two streaming dataframes.

I this example, I am going to use

Apache Spark 2.3.0
Apache Kafka 0.11.0.1
Scala 2.11.8

The build.sbt looks like the following:-

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "0.11.0.1")

To create the two streaming dataframes, I am going to send the data to Kafka with some regular time interval in two separate topics, here I name them as ‘dataTopic1’ & ‘dataTopic2’.

For sending the data, first I simply make the list of integers & send these integers from the list to Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
}

In the same way, I send the data to 2nd Kafka topic called “dataTopic2”.

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
})

After sending data to Kafka, I start reading the dataframe from the topic like I read the dataFrame1 from Kafka topic “dataTopic1” as follow:-

val dataFrame1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I read the dataFrame2 from Kafka topic “dataTopic2”. After that, apply the join on the dataframe column called “data” as follows:-

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming dataframes into one, whose schema is as follow:-

root
 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the dataframes, it will give us the 2 timeStamps, one for each of the data. Because of this, the final output is as follows:-

image

If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Pasted image at 2017_11_27 04_17 PM

Written by 

Ayush is a Software Consultant, with experience of more than 1 year. He has specialisation in Hadoop and has good knowledge of many programming languages like C, Java and Scala. HQL, Pig Latin, HDFS, Flume and HBase adds to his forte. He is familiar with technology like Scala, Spark Kafka, Cassandra, Dynamo DB, Akka & many more. His hobbies include playing football and biking.

1 thought on “Spark Stream-Stream Join2 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading