Flink: Time Windows based on Processing Time

Reading Time: 4 minutes

In the previous blog, we talked about Flink’s windows operator, a heart of processing infinite streams. Generally in Flink, after specifying that the stream is keyed or non keyed, the next step is to define a window assigner. The window assigner defines how elements are assigned to windows. Flink provides some useful predefined window assigners like Tumbling windows, Sliding windows, Session windows, Count windows, and Global windows. We can use any of them as per our use case or even we can create custom window assigners in Flink.

In this blog, we will learn about the first two window assigners i.e., Tumbling and sliding windows. These two window assigners, assign elements to windows based on time, which can either be processing time or event time.

  • Event time refers to the processing of streaming data based on timestamps which are attached to each row. The timestamps can encode and attached to the entity when an event is generated at its source.
  • Processing time refers to the system time of the machine (also known as “wall-clock time”) that is executing the respective operation. This is the time when the actual processing of stream entities is Started.

In this blog, we are going to use processing time for our Flink application. Now let see the tumbling and sliding windows in action.

Tumbling Windows

Tumbling windows assigner assigns each element to a window of specified window size. It has a fixed size measured in time and does not overlap. For example, a window size of 20 seconds will include all entities of the stream which came in a certain 20-sec interval. The entity which belongs to one window doesn’t belong to any other tumbling window. Also, the number of entities may differ within different windows based on the rate at which the entities are received by Flink. Let’s see below the Tumbling window in action where sum() operation is applied to each window. The window tumbles over the data in a non-overlapping manner.

Tumbling Windows Example

Let’s write a simple Flink application for word count problem. In the application, we will use the Tumbling window assigner and the window is based on processing time.

public static void main(String[] args) {
    LOGGER.info("Sliding window word count example.");

    StreamExecutionEnvironment executionEnvironment =
            StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> text = executionEnvironment
            .socketTextStream("localhost", 9000, '\n', 6);

    final DataStream<WordWithCount> reduce = 
           
            text.flatMap((FlatMapFunction<String, WordWithCount>)
                    (textStream, wordCountKeyPair) -> {
                for (String word : textStream.split("\\W")) {
                    wordCountKeyPair.collect(new WordWithCount(word, 1L));
                }
            }, TypeInformation.of(WordWithCount.class))
            .keyBy((KeySelector<WordWithCount, String>) wordWithCount -> wordWithCount.word,
                    TypeInformation.of(String.class))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .reduce((ReduceFunction<WordWithCount>)
                    (a, b) -> new WordWithCount(a.word, a.count + b.count));

    // print the results with a single thread, rather than in parallel
    reduce.print();

    executionEnvironment.execute("Socket Window WordCount");
}

In the above code snippet, reading the text stream from the socket using Netcat utility and then apply Transformations on it. First applied a flatMap operator that maps each word with count 1 like (word: 1). Then created a keyed stream using the keyBy() method and specifying the word as a key parameter. Then apply a window operation to the values associated with each key(word).

We want a tumbling window and window to be based on processing time that’s why using TumblinProcessingTimeWindows Class. The window size is 10 sec which means all entities which come within 10 seconds will be included in one window. Finally applied sum aggregation using ReduceFunction over the entities in that window which results in how often a word occurs within 10-sec interval.

You can find this sample Flink application here. Clone it, Run it, and play with it to see the behavior of Tumbling window.

Sliding Windows

Sliding window assigner is similar to a tumbling windows assigner, the size of the windows is measured in time and is fixed. The important difference from the Tumbling window is the fact that it allows an entity to be present in more than one window. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. Let’s see below the Sliding window in action where sum() operation is applied on each window. The window Slides over the data in an overlapping manner.

We can see in the above action some of the entities are present in more than one window i.e overlaps within windows. Because fixed-size window slides with a certain time interval over the stream of data.

Sliding Window Example

In the same above application of the word count problem, we can use the Sliding window assigner and the window is based on processing time. The only change needed is to add SlidingProcessingTimeWindows and extra Sliding time interval:

.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))

Above, provided window size of 30 sec and a sliding time interval of 10 seconds. You can find this sample Flink application here. Clone it, Run it, and play with it to see the behavior of the Sliding window.

One of the use cases where sliding windows are useful when we want to track the error messages from a website. For example, we want to see how the errors have been mapped with time. And see if they gradually resolved after applied a patch or fix to the website.

In the next blog we will explore Count window assigner and Session window assigner.

Keep Learning!!

1 thought on “Flink: Time Windows based on Processing Time5 min read

Comments are closed.