Self-Learning Kafka Streams with Scala – #2

Table of contents
Reading Time: 2 minutes

In our previous blog – Self-Learning Kafka Streams with Scala – #1, we saw how to create a simple KStream in Scala. In this blog, we will see how to transform a KStream and create a new Stream from it.

But, before we get into the details of the KStream transformations, let’s take a look at the code:

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters

package com.knoldus.kafka.examples
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStreamBuilder, KeyValueMapper}
* Copyright Knoldus Software LLP, 2017. All rights reserved.
object MapExample {
def main(args: Array[String]): Unit = {
val config = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
val stringSerde = Serdes.String()
val integerSerde = Serdes.Integer()
val builder = new KStreamBuilder()
val originalStream ="SourceTopic")
val mappedStream =[String, Integer] {
new KeyValueMapper[String, String, KeyValue[String, Integer]] {
override def apply(key: String, value: String): KeyValue[String, Integer] = {
new KeyValue(key, new Integer(value.length))
}, integerSerde, "SinkTopic")
val streams = new KafkaStreams(builder, config)

Now, there are 2 major points to be noted down here:

  1. Why are we using anonymous functions of Java in (Scala) map function? The answer lies in Blog #1, where we mentioned that “Kafka Streams does not provide a Scala API”, which leaves us with no choice but to use Java 8 anonymous functions.
  2. Here we are providing Serializer/De-Serializer(SerDe) for “SinkTopic” explicitly. Now, why do we need that? Since in our previous blog’s example we didn’t do that. The reason is, we have given String SerDe in KafkaStreams “properties”. This leaves Kafka Streams with only one type of SerDe to work with, i.e., String. Whereas, we need an Integer SerDe for “SinkTpic”.

At last, let’s start the Kafka server, run the example and send some messages:

The result that we will receive is as follows:

For the consumer, we have to specify the “value.deserializer” property, otherwise we will receive the result in binary format.

So, this is how transformations are written in Kafka Streams with Scala. I hope you liked it and wanted to know about other operations in Kafka Streams like joins, aggregations, etc.

The complete code can be downloaded from Github.

Please feel free to suggest or comment!


Written by 

Himanshu Gupta is a software architect having more than 9 years of experience. He is always keen to learn new technologies. He not only likes programming languages but Data Analytics too. He has sound knowledge of "Machine Learning" and "Pattern Recognition". He believes that best result comes when everyone works as a team. He likes listening to Coding ,music, watch movies, and read science fiction books in his free time.

1 thought on “Self-Learning Kafka Streams with Scala – #22 min read

Comments are closed.