In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to see how beautifully spark now give support for joining the two streaming dataframes.

I this example, I am going to use

Apache Spark 2.3.0
Apache Kafka 0.11.0.1
Scala 2.11.8

The build.sbt looks like the following:-

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "0.11.0.1")

To create the two streaming dataframes, I am going to send the data to Kafka with some regular time interval in two separate topics, here I name them as ‘dataTopic1’ & ‘dataTopic2’.

For sending the data, first I simply make the list of integers & send these integers from the list to Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
}

In the same way, I send the data to 2nd Kafka topic called “dataTopic2”.

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
})

After sending data to Kafka, I start reading the dataframe from the topic like I read the dataFrame1 from Kafka topic “dataTopic1” as follow:-

val dataFrame1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I read the dataFrame2 from Kafka topic “dataTopic2”. After that, apply the join on the dataframe column called “data” as follows:-

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming dataframes into one, whose schema is as follow:-

root
 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the dataframes, it will give us the 2 timeStamps, one for each of the data. Because of this, the final output is as follows:-

image

If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Pasted image at 2017_11_27 04_17 PM

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.