Kafka is a messaging broker system which facilitates the passing of messages between producer and consumer whereas Spark Structure streaming consumes static and streaming data from various sources like kafka, flume, twitter or any other socket which can be processed and analysed using high level algorithm for machine learning and finally pushed the result out to external storage system. The main advantage of structured streaming is to get the continuous incrementing the result as the streaming data continue to arrive.
Though the kafka has its own stream library and its best suitable for transforming a kafka topic to topic whereas spark streaming are almost integrated with any type of system. For more detail you can refer to this blog.
In this blog i’ll cover an end to end integration of kafka with spark structured streaming by creating kafka as source and spark structured streaming as sink.
Let’s create a maven project and add following dependencies in pom.xml.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency>
Now we will be creating a kafka producer which produces the messages and pushed it to topic and the consumer will be spark structured streaming dataframe.
First setting properties for kafka producer.
val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
bootstrap.servers : This contains the full list of servers with hostname and port. The list should be in the form of host1:port,host2:port,..and so on.
key.serializer : Serializer class for key that implement serializer interface.
value.serializer : Serializer class for key that implement serializer interface.
Creating a kafka producer and sending topic over stream.
val producer = new KafkaProducer[String,String](props) for(count <- 0 to 10) producer.send(new ProducerRecord[String, String](topic, "title "+count.toString,"data from topic")) println("Message sent successfully") producer.close()
The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. The result of the send is a RecordMetadata specifying the partition the record was sent to and the offset it was assigned. After sending data close the producer using close method.
Kafka As Source
Now spark will be a consumer of streams produced by kafka. For this we need to create a spark session.
val spark = SparkSession .builder .appName("sparkConsumer") .config("spark.master", "local") .getOrCreate()
Getting the topics from kafka and reading it in spark stream by subscribing to a particular topic that is to be provided in option. Following is the code to subscribe kafka topic in spark stream and reading it using readstream.
val dataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "mytopic") .load()
Printing the schema of dataframe.
Output for schema includes all the fields related to kafka metadata.
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
Creating a dataset from dataframe by casting the key and value from topic as string.
val dataSet: Dataset[(String, String)] =dataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
Writing data of dataset to console and holding the program from exit using method awaitTermination.
val query: StreamingQuery = dataSet.writeStream .outputMode("append") .format("console") .start() query.awaitTermination()
The complete code is on my gitHub.
Hope you find this blog helpful!!
Happy Reading 🙂