If you are working on huge amount of data, you might have heard about Kafka. At a very high level, Kafka is a fault tolerant, distributed publish-subscribe messaging system that is designed for fast processing of data and the ability to handle hundreds of thousands of messages.
What is Stream Processing
Stream processing is the real-time processing of data continuously, concurrently, and in a record-by-record fashion.
Kafka has many applications, one of which is real-time processing.
Let us first understand, What we actually do in real-time processing. In simple words, we all know that it include continuous stream of data. Some form of analysis is done and we get some useful data out of it. In terms of kafka we will say, Real time processing typically involves reading data from a topic (source) doing some analysis or transformation work, and then writing the results back to another topic (sink). Currently to do this type of work possible choices are:
1. Writing your own custom code by using a KafkaConsumer to read the data and then writing that data via a KafkaProducer.
2. Use a full fledged stream-processing framework such as Spark Steaming, Flink, Storm etc.
Now we will learn about an alternative to all above options, i.e Kafka Streams
What is Kafka Streams
Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or call an external services, or update databases, or whatever). Kafka Streams allows you do this with concise code in a way that is distributed and fault-tolerant.
Implementing Kafka Streams
A stream processing application built with Kafka Streams looks like this:
1) Providing Stream Configurations :
Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "Streaming-QuickStart"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
2) Get Topic and Serde’s
String topic = configReader.getKStreamTopic(); String producerTopic = configReader.getKafkaTopic(); final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long();
3) Building Stream and fetching of data.
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> inputStreamData = builder.stream(stringSerde, stringSerde, producerTopic);
4) Processing of Stream.
KStream<String, Long> processedStream = inputStreamData.mapValues(record -> record.length() )
Besides join and aggregation operations, there is a list of other transformation operations provided for Kstream. Each of these operations may generate either one or more Kstream objects and can be translated into one or more connected processors into the underlying processor topology. All these transformation methods can be chained together to compose a complex processor topology.
Among these transformations, filter, map and mapValues etc, are stateless transformation operations where users can usually pass a customized function to these functions as a parameter, such as Predicate for filter , KeyValueMapper for map, etc as per their usage in a language.
5) Writing Streams Back to Kafka.
processedStream.to(stringSerde, longSerde, topic);
At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the start() method:
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start();
6) Close the Stream.
I hope i am able to help the readers to have a quick start with Kafka Streams. Happy Coding 🙂