In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to learn about Spark Stream-Stream Join and see how beautifully spark now give support for joining the two streaming dataframes.
I this example, I am going to use
Apache Spark 2.3.0 Apache Kafka 0.11.0.1 Scala 2.11.8
The build.sbt looks like the following:-
scalaVersion := "2.11.8" libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0", "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0", "org.apache.kafka" % "kafka-clients" % "0.11.0.1")
To create the two streaming dataframes, I am going to send the data to Kafka with some regular time interval in two separate topics, here I name them as ‘dataTopic1’ & ‘dataTopic2’.
For sending the data, first I simply make the list of integers & send these integers from the list to Kafka topic with some regular time intervals, as follows.
val records1 = (1 to 10).toList
records1.foreach { record => val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString) producer.send(producerRecord) Thread.sleep(200) }
In the same way, I send the data to 2nd Kafka topic called “dataTopic2”.
val records2 = (5 to 15).toList
records2.foreach(record => { val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString) producer.send(producerRecord) Thread.sleep(200) })
After sending data to Kafka, I start reading the dataframe from the topic like I read the dataFrame1 from Kafka topic “dataTopic1” as follow:-
val dataFrame1 = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootStrapServers) .option("subscribe", "dataTopic1") .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)] .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1")) .select("data", "timestamp1")
In the same way, I read the dataFrame2 from Kafka topic “dataTopic2”. After that, apply the join on the dataframe column called “data” as follows:-
val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")
This will join these two streaming dataframes into one, whose schema is as follow:-
root |-- data: integer (nullable = true) |-- timestamp1: timestamp (nullable = true) |-- timestamp2: timestamp (nullable = true)
Since it found the same value of “data” in both the dataframes, it will give us the 2 timeStamps, one for each of the data. Because of this, the final output is as follows:-
If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.
1 thought on “Spark Stream-Stream Join2 min read”
Comments are closed.