Getting close to Apache Flink, albeit in a Träge manner – 2

Reading Time: 6 minutes

From the preceding post in this series

As a stream of events enter a Flink-based application, we can apply a transformation of CountWindow on it (there are many such transformations the Flink offers us, we will meet them as we go). CountWindow allows us to create a group of arriving events and use that group for some application-specific action (in the last blog, we found out the maximum temperature in the group of events). As long as those events arrive, we continue to receive the a series of maximum temperature and probably, take some further action on it.
To put it differently, we want to look at the events in terms of their count. We don’t care when the events enter our system or when they actually occur. It is quite possible that between the 2nd and 3rd event in a count-based window, the time lag is considerable. In other words, the time it takes for a window of count – say ‘5’ – to form may be arbitrary; yet we don’t really care. All we want is a collection of 5 events at our disposal before we proceed to make some meaning of them.

a case for time, rather than count

It is important to refer to the aspect of ‘discretization’ that I have mentioned in the preceding blog. Arriving events form a continuous stream while, to be able to carry out meaningful computations, we need to have discretized view of the same. Time and Count give us the conceptual planks on which we can base the mechanism of discretization. Rather obviously, while dealing with TimeWindows, Flink uses ‘Time’ as the basis of discretization.

Flink is responsive to three ‘notions of time’ (from this WiKi page):
  • Processing time
This is the wall-clock time of the machine where the transformations are taking place. We don’t care when does an event come into being; we are only interested about exactly when the machine (running Flink’s transformations) is processing it. While this is perhaps the simplest notion to understand, it is also important to bear in mind, that in a distributed environment, it is not foreknown which machine is going to process that event and hence, what timestamp is associated with an event.
  • Event time
Event time is the time when the event comes into being, not when it enters Flink. Typically, such a time is only known to the source that generates the event and is responsible for gluing it on the event. While processing the event inside Flink, we can tap and use it. Because the event carries its own timestamp, it is possible to treat an event that arrives out-of-order (later than its followers).
  • Ingestion time
Ingestion time is what it indicates: the time at which the event ingested in Flink. It is what the wall-clock reads at the point when an event is ready at the source and is about to enter Flink’s operators. Put differently, an ingestion timestamp is put by Flink just before the event begins its journey along the Flink’s processing pipeline. It is more predictable than Processing Time mentioned earlier and is interpreted correctly even when the processing is distributed. From the (fantastic) blog of Fabian Hueske ( ‘ a hybrid of processing and event time.

Armed with these three distinct notions of time offered by Flink, let’s proceed.

We are using the sample dataset obtained from pubnub as we have done earlier. This time, we are going to consider the timestamp that is a part of the data that we read. Here’s a row from the file sampleIOT.csv :


2nd field from the right is the timestamp: it indicates the time at which this reading event has been generated by pubnub; the rightmost field is the reading for ambient temperature. We are going to instruct Flink that it should use this timestamp, to determine which window does the event belong to. The notion of time that we are resorting to is Event Time from above.

   val env = StreamExecutionEnvironment.createLocalEnvironment(1)
   val readings =
     .map(e => (e.timeStamp,e.ambientTemperature))
     .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)

We are instructing Flink to interpret the incoming events’ timestamp as EventTime. Also, we are calling assignAscendingTimeStamps() to indicate that the first field of the pair should be used as the timestamp and these can be assumed to be in the ascending order (the CSV file is duly sorted on the timeStamp field before being fed to Flink).

Then, we are creating a timeWindow which spans over a period of 3 milliseconds. So, Flink is taking in the records (events) that are coming in and keeping an eye on the timeStamp field of each of these. A record – whose timeStamp is within the first 3 milliseconds block –  qualifies to be bunched with other such in that window. That bunch is then given to maxBy(), and the maximum ambient temperature amongst the readings in that bunch is picked up.
Let’s try and understand the way Flink creates the timeWindow. Here’s a portion of the data file:

First window (of 3 milliseconds), the highest ambient temperature is 30.02
Second of (of 3 milliseconds), the highest ambient temperature is 27.83

And, so on. When our small program is run using the actual data file (SampleIOT.csv), Flink generates the following output:


For good measure, if we create a timeWindow of 1 millisecond, thus:

val readings =
     .map(e => (e.timeStamp,e.ambientTemperature))
     .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)

Flink gives us this:


Effectively, we have been able determine, the highest ambient temperatures of every 1-millisecond time span.

At this point, I have not been able to figure out, how Flink decides the the order in which tuples are printed. Whenever I do that, I will update the blog.

Sliding Time Window

The timeWindow that we have used here is a tumbling window. We recall from the earlier blog-post that once we apply our computation (here, maxBy() ) to its contents, a tumbling window gets rid of (tumbles) its contents, by consigning them to oblivion.  
As one expects, there is a variant of timeWindow which behaves as a sliding window. We can put it to use, thus:

val readings =
     .map(e => (e.timeStamp,e.ambientTemperature))
     .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)

When our small program is run using the actual data file (SampleIOT.csv), Flink generates the following output:
To understand how Flink generates the output above, let us use a smaller dataset (not what SampleIOT.csv contains). We consider a series of pairs of values; each pair consists of (timeStamp in ms,temperature recorded in degrees):

Remember that we are using a sliding timeWindow(3,1) along with the ‘notion of time’ applicable being that of eventTime and instructing Flink to use the timeStamp of the pair as an ascending eventTime.
timeWindow sliding this way ->
timeStamp == 1
Flink prints:21.05
TimeStamp == 1,2
Flink prints: 21.05
TimeStamp == 1,2,3
First timeWindow created, Flink prints: 21.05
TimeStamp == 2,3,4
Second timeWindow created (after slide), Flink prints 29.05
TimeStamp == 3,4,5
Third timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 4,5
Fourth timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 5
Fifth and final timeWindow created (after slide), Flink prints 26.05

This may not be the best representation of what really happens, but I hope that this provides sufficient visualization of the same! 🙂

As we observe, in a sliding window, events are grouped according to their timestamps – in this case, 3 milliseconds. Also, an event can belong to more than window because the window slides progressively, by 1 millisecond in this case. From our small example dataset above, pairs (2,*) and (3,*) belong to two successive timeWindows.

I will share findings of further explorations in the world of Flink, in my upcoming blogs.

I have used Flink 0.10.0 for the code shown above. It can be accessed here and here.

Many thanks to fantastic folks are not only are behind Flink, but also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concepts.

Written by 

Vikas is the CEO and Co-Founder of Knoldus Inc. Knoldus does niche Reactive and Big Data product development on Scala, Spark, and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). Vikas has been working in the cutting edge tech industry for 20+ years. He was an ardent fan of Java with multiple high load enterprise systems to boast of till he met Scala. His current passions include utilizing the power of Scala, Akka and Play to make Reactive and Big Data systems for niche startups and enterprises who would like to change the way software is developed. To know more, send a mail to or visit

2 thoughts on “Getting close to Apache Flink, albeit in a Träge manner – 29 min read

  1. “At this point, I have not been able to figure out, how Flink decides the the order in which tuples are printed”
    This is due to the way watermarks flow through operators. By default watermark is generated every 200(or 100) ms. In your toy example all windows are triggered together.

Comments are closed.