In our previous blog – Self-Learning Kafka Streams with Scala – #1, we saw how to create a simple KStream in Scala. In this blog, we will see how to transform a KStream and create a new Stream from it.
But, before we get into the details of the KStream transformations, let’s take a look at the code:
Now, there are 2 major points to be noted down here:
- Why are we using anonymous functions of Java in (Scala) map function? The answer lies in Blog #1, where we mentioned that “Kafka Streams does not provide a Scala API”, which leaves us with no choice but to use Java 8 anonymous functions.
- Here we are providing Serializer/De-Serializer(SerDe) for “SinkTopic” explicitly. Now, why do we need that? Since in our previous blog’s example we didn’t do that. The reason is, we have given String SerDe in KafkaStreams “properties”. This leaves Kafka Streams with only one type of SerDe to work with, i.e., String. Whereas, we need an Integer SerDe for “SinkTpic”.
At last, let’s start the Kafka server, run the example and send some messages:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SourceTopic hello world!
The result that we will receive is as follows:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --topic SinkTopic 12
For the consumer, we have to specify the “value.deserializer” property, otherwise we will receive the result in binary format.
So, this is how transformations are written in Kafka Streams with Scala. I hope you liked it and wanted to know about other operations in Kafka Streams like joins, aggregations, etc.
The complete code can be downloaded from Github.
Please feel free to suggest or comment!