What are Streams
Streams are known as unbounded and continuous flow of data packets in real time. Data packets are generally generated in form of key value pair. Producer transfer these packets automatically, means there is no need to place a request.
What are Kafka Streams
Kafka Streams is one of the project of Apache Kafka community. It is a client library for building data pipelines and microservices. In this, the input and output data is stored in Kafka clusters. It also provides real time stream processing on top of Kafka Consumer client. These streams make it easier to transform or filter data from one Kafka topic and publish it to another Kafka topic.

Kafka Streams API
Kafka Stream APIs provides data parallelism, fault tolerance and scalability. It deals with messages as unbounded, continuous and real time flow of records, with below characteristics :-
- Single Stream for consuming and producing.
- Do not Support batch processing.
- Supports stateless and stateful operations.
- No external dependency on system other than Kafka itself.
- Employs one record at a time processing to achieve milliseconds processing latency.
Dependencies
To implement examples, we simply add the kafka streams api in build.sbt
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.8.0"
Code
To write a Kafka stream application, firstly import the below mentioned libraries
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
Also, set up some of the configuration properties required for the streams
val streamsConfiguration = new Properties()
streamsConfiguration.put(APPLICATION_ID_CONFIG, "Kafka-Streams-Example")
streamsConfiguration.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
streamsConfiguration.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
Firstly, create an instance of KStreamBuilder. The builder object has an stream method that takes input topic name and return an instance of kstream object. Then on kstream object various methods like map, join can be applied which returns us an another kstream object which can be written to output kafka topic.
val builder = new KStreamBuilder
val kStream = builder.stream("Input_Topic")
val upperCaseKStream = kStream.mapValues(_.toLowerCase)
upperCaseKStream.to("Output_Topic")
val stream = new KafkaStreams(builder, streamsConfiguration)
stream.start()