Kafka Streams

Reading Time: 2 minutes

What are Streams

Streams are known as unbounded and continuous flow of data packets in real time. Data packets are generally generated in form of key value pair. Producer transfer these packets automatically, means there is no need to place a request.

What are Kafka Streams

Kafka Streams is one of the project of Apache Kafka community. It is a client library for building data pipelines and microservices. In this, the input and output data is stored in Kafka clusters. It also provides real time stream processing on top of Kafka Consumer client. These streams make it easier to transform or filter data from one Kafka topic and publish it to another Kafka topic.

Kafka Streams API

Kafka Stream APIs provides data parallelism, fault tolerance and scalability. It deals with messages as unbounded, continuous and real time flow of records, with below characteristics :-

  • Single Stream for consuming and producing.
  • Do not Support batch processing.
  • Supports stateless and stateful operations.
  • No external dependency on system other than Kafka itself.
  • Employs one record at a time processing to achieve milliseconds processing latency.

Dependencies

To implement examples, we simply add the kafka streams api in build.sbt

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.8.0"

Code

To write a Kafka stream application, firstly import the below mentioned libraries

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}

Also, set up some of the configuration properties required for the streams

val streamsConfiguration = new Properties()
streamsConfiguration.put(APPLICATION_ID_CONFIG, "Kafka-Streams-Example")
streamsConfiguration.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
streamsConfiguration.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)

Firstly, create an instance of KStreamBuilder. The builder object has an stream method that takes input topic name and return an instance of kstream object. Then on kstream object various methods like map, join can be applied which returns us an another kstream object which can be written to output kafka topic.

val builder = new KStreamBuilder
val kStream = builder.stream("Input_Topic")
val upperCaseKStream = kStream.mapValues(_.toLowerCase)
upperCaseKStream.to("Output_Topic")
val stream = new KafkaStreams(builder, streamsConfiguration)
stream.start()

Written by 

Amarjeet is a Software Consultant at Knoldus Software LLP. He has an overall experience of 2 years and 10 months.He has completed his Bachelor of Technology in Computer Science from National Institute of Technology, Hamirpur, Himachal Pradesh. He likes reading books, travelling and trekking.