
In our previous blog posts:
- Flinkathon: Why Flink is better for Stateful Streaming applications?
- Flinkathon: What makes Flink better than Kafka Streams?
We saw why Apache Flink is a better choice for streaming applications. In this blog post, we will explore how easy it is to express a streaming application using Apache Flink’s DataStream API.
DataStream API
DataStream API is used to develop regular programs that apply transformations on data streams like filtering, updating state, defining windows, aggregating, etc. The data stream can be picked from various sources like message queues, socket streams, files, etc. And the results are returned via sinks, which may, for example, write the data to files, or to standard output (for example the command line terminal).
For example, suppose you have a stream of words coming in from a message queue like Kafka, and you want to calculate the Count for each Word, then in order to write the code using Apache Flink we have to take following steps:
Add Dependencies
For using Apache Flink’s DataStream API, we first have to add the following dependencies in our project (SBT):
"org.apache.flink" %% "flink-scala" % "1.7.2"
"org.apache.flink" %% "flink-streaming-scala" % "1.7.2"
"org.apache.flink" %% "flink-connector-kafka" % "1.7.2"
The first dependency is required to write Flink application in Scala, hence we need the Scala API of Flink. The second dependency is to use DataStream API (Scala) of Flink and the last dependency is to use Flink-Kafka connector, in order to read data stream from Kafka.
Write Application
Now we are all set and can proceed to write the streaming application using Flink. The first step is to set up the execution environment:
val env = StreamExecutionEnvironment.getExecutionEnvironment
Next step is to setup Kafka Consumer, the source from where the data will be consumed:
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String](
"input", new SimpleStringSchema, kafkaConsumerProperties)
We just have provided the Kafka topic name from which data will be consumed, the Serializer which is required to deserialize the message from Kafka, and the Kafka cluster’s location/configuration.
Next, we need to write the code for transforming the messages from Kafka into the desired result, i.e., WORD-COUNT. For that we have to write the following code:
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1)
In the above code, we have just split each and every message into words and mapped them into a tuple of (word, 1). And in last we have grouped them together and found the final count. At last, we have to provide a sink, so that Flink can provide us the result. For that we are using a standard output, i.e., command line terminal and execute the process:
// emit result
counts.print()
// execute program
env.execute("Streaming WordCount")
Now, we just have to push some data into the Kafka topic and see results of the program:






Note that the word “you” have arrived twice, hence it’s count is 2.
Conclusion
In short, we have covered Apache Flink’s DataStream API and it’s the ability to process a data stream from Kafka. For downloading the full source code please visit the link: Flinkathon Github.
In the upcoming blogs, we will explain state management feature of Flink with the help of which you can keep the results safe, so, that in case of a failure or maintenance the results are not lost. So, stay tuned 🙂


