Flink: Implementing the Count Window

Reading Time: 3 minutes

In the blog, we learned about Tumbling and Sliding windows which is based on time. In this blog, we are going to learn to define Flink’s windows on other properties i.e Count window. As the name suggests, count window is evaluated when the number of records received, hits the threshold.

Count window set the window size based on how many entities exist within that window. For example, if we fixed the count as 4, every window will have exactly 4 entities. It doesn’t matter whats the size of the window in terms of time. Window size will be different but the number of entities in that window will always be the same. Count windows can have overlapping windows or non-overlapping, both are possible. The count window in Flink is applied to keyed streams means there is already a logical grouping of the stream based on all values associated with a certain key. So the entity count will apply on a per-key basis.

Count window in Action

Consider a logical grouping of a stream where the key is A and B. They are part of the same stream and are already logically grouped based on the value of their key. Then apply count window operation on the keyed stream. The window would apply separately for each key means window would move over the entities over each key independently.

In the above action, the entity count is 3 that’s why the window is moving with the 3 entities over the stream. The window size is not fixed. Once the 3 entities receive it takes within a window.

Count Window Example

Writing a Flink application for word count problem and using the count window on the word count operation. Reading the text stream from the socket using Netcat utility and then apply Transformations on it. First applied a flatMap operator that maps each word with count 1 like (word: 1). Then created a keyed stream using the keyBy() method and specifying the word as a key parameter. Then apply a count window operation to the values associated with each key(word) and pass a count 4 to threshold the entities size within the window.

public static void main(String[] args) {
LOGGER.info("Count window word count example.");

StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);

final DataStream<WordWithCount> reduce =

text.flatMap((FlatMapFunction<String, WordWithCount>)
(textStream, wordCountKeyPair) -> {
for (String word : textStream.split("\\W")) {
wordCountKeyPair.collect(new WordWithCount(word, 1L));
}
}, TypeInformation.of(WordWithCount.class))
.keyBy((KeySelector<WordWithCount, String>) wordWithCount -> wordWithCount.word,
TypeInformation.of(String.class))
.countWindow(4)
.reduce((ReduceFunction<WordWithCount>)
(a, b) -> new WordWithCount(a.word, a.count + b.count));

// print the results with a single thread, rather than in parallel
reduce.print();

executionEnvironment.execute("Socket Window WordCount");
}

Run nc -l 9000 then Run the Flink application. Also tail the Flink log to see whats the output we are getting:

tail -f log/flink--taskexecutor-.out

Result

Notice that when the word count reaches 4 for any word it is output to screen because the window is complete for that word or key. The output for a word is not displayed on screen unless its count reached to 4. The window is applied on a per-key basis.

Above we used A tumbling count window of 4 that collects 4 events in a window and evaluates the window when the 100th element has been added. Similarly, we can use the Sliding Count window with an additional parameter trigger elements.

                    .countWindow(4,2)

Above we defined, sliding count window of 4 elements size and 2 elements trigger interval.

You can find this sample Flink application here. Clone it, Run it, and play around it to see the behavior of Count window.

Happy Blogging!!

1 thought on “Flink: Implementing the Count Window4 min read

Comments are closed.