Self-Learning Kafka Streams with Scala – #1

Table of contents
Reading Time: 2 minutes

A few days ago, I came across a situation where I wanted to do a stateful operation on the streaming data. So, I started finding possible solutions for it. I came across many solutions which were using different technologies like Spark Structured Streaming, Apache Flink, Kafka Streams, etc.

All the solutions solved my problem, but I selected Kafka Streams because it met most of my requirements. After that, I started reading its documentation and trying to run its examples. But, as soon as I started learning it, I hit a major roadblock, that was, “Kafka Streams does not provide a Scala API!“. I was shocked to know that.

The reason I was expecting Kafka Streams to have a Scala API was that I am using Scala to build my application and if Kafka Streams provided an API for it then it would have been easy for me to include it in my application. But that didn’t turn out to be the case. Over the top when I searched for its Scala examples, I was able to find only a handful of them.

So, I decided to learn it on my own and my first step was to build a “Hello World!” program using Kafka Streams and Scala, like this:

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters

package com.knoldus.kafka.examples
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.KStreamBuilder
* Copyright Knoldus Software LLP, 2017. All rights reserved.
object StreamApplication {
def main(args: Array[String]): Unit = {
val config = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
val builder = new KStreamBuilder()
val sourceStream ="SourceTopic")"SinkTopic")
val streams = new KafkaStreams(builder, config)

Before running this example, we need to start Kafka server. To do that you can read their quick start guide. After that, send some messages to Kafka topic – “SourceTopic” and start a Kafka Consumer for Kafka topic – “SinkTopic“.

Now, run the example and you will see that Kafka consumer topic – “SinkTopic” will receive the message.

This means that now we are able to send messages from one Kafka topic to another via Kafka Streams.

So, this was my first step to learn Kafka Streams with Scala. I know that it is not much, but I still need to explore more in Kafka Streams like transformations, joins, aggregations, etc., about which I will be writing in my future posts. So, stay tuned 🙂

The complete code can be downloaded from Github.

Please feel free to suggest or comment!


Written by 

Himanshu Gupta is a software architect having more than 9 years of experience. He is always keen to learn new technologies. He not only likes programming languages but Data Analytics too. He has sound knowledge of "Machine Learning" and "Pattern Recognition". He believes that best result comes when everyone works as a team. He likes listening to Coding ,music, watch movies, and read science fiction books in his free time.

2 thoughts on “Self-Learning Kafka Streams with Scala – #12 min read

  1. Hello himanshu
    Can you just help me how to mock database connection using Mockito by using scala language in spark application?
    Thanks in advance!!!!!!

Comments are closed.