From the preceding post in this series
a case for time, rather than count
Referring to the case (from the preceding blog) where we want to know more about the cars passing by a particular street junction, we can make two types of observations: ( 1 ) for every 100 cars crossing the junction in either of the directions, how many times do the signals go RED and ( 2 ) in every 5 minutes, how many cars pass through the junction in either of the directions. The first-one is about a window of how many cars, ignoring the duration we may have to wait for 100 cars to pass by (on a holiday in the office district, perhaps). The second-one is about a window of a fixed time, not caring about the number of cars that may pass by (may be 0, 5 or 75). Flink gives us CountWindow for the earlier and TimeWindow for the latter. In this blog, I take a look at the TimeWindow feature of Flink.
Time, in the world of Flink’s windows
In the world of Flink, ‘time’ that is associated with an ‘event’ has great significance. An event always has a timestamp. This timestamp helps Flink to decide how to treat the event (what transformation can be applied on it). If we think a little about it, it is rather intuitive. In an endless flow of events – which are anonymous and opaque to Flink when they arrive – the only determinable aspect that Flink can associate with each of them is when come into being. If the source – which is generating the events – is already putting such a timestamp with every event, then Flink’s job is less. In fact, there can be many situations where one needs to carry out an analysis of the events based upon when the event had been generated and not when they are seen by Flink. As an alternative, we can ask Flink to associate a timestamp to every event arriving. Either way, every upstream operation can safely assume that every event carries a timestamp with it and use that during the computations.
-
Processing time
-
Event time
-
Ingestion time
Back to Flink
We are using the sample dataset obtained from pubnub as we have done earlier. This time, we are going to consider the timestamp that is a part of the data that we read. Here’s a row from the file sampleIOT.csv :
probe–f076c2b0,201,842.53,75.5372,1448028160,29.37
|
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings =
readIncomingReadings(env,“./sampleIOT.csv”)
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3))
.maxBy(1)
|
1448028160,30.02
1448028160,25.92
1448028160,22.18
1448028161,16.18
1448028161,16.36
1448028161,19.19
1448028162,18.99
1448028162,27.62
1448028162,18.82
|
First window (of 3 milliseconds), the highest ambient temperature is 30.02
|
1448028163,20.44
1448028163,16.18
1448028163,21.57
1448028164,22.66
1448028164,27.83
1448028164,23.22
1448028165,19.69
1448028165,21.59
|
Second of (of 3 milliseconds), the highest ambient temperature is 27.83
|
(1448028166,32.06)
(1448028160,30.02)
(1448028163,27.83)
|
val readings =
readIncomingReadings(env,“./sampleIOT.csv”)
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(1))
.maxBy(1)
|
(1448028166,32.06)
(1448028162,27.62)
(1448028164,24.34)
(1448028167,26.37)
(1448028165,23.73)
(1448028163,27.83)
(1448028161,29.43)
(1448028160,30.02)
|
Sliding Time Window
The timeWindow that we have used here is a tumbling window. We recall from the earlier blog-post that once we apply our computation (here, maxBy() ) to its contents, a tumbling window gets rid of (tumbles) its contents, by consigning them to oblivion.
As one expects, there is a variant of timeWindow which behaves as a sliding window. We can put it to use, thus:
val readings =
readIncomingReadings(env,“./sampleIOT.csv”)
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3),Time.milliseconds(1))
.maxBy(1)
When our small program is run using the actual data file (SampleIOT.csv), Flink generates the following output:
(1448028166,32.06)
(1448028166,32.06)
(1448028160,30.02)
(1448028167,26.37)
(1448028161,29.43)
(1448028160,30.02)
(1448028163,27.83)
(1448028166,32.06)
(1448028163,27.83)
(1448028160,30.02)
To understand how Flink generates the output above, let us use a smaller dataset (not what SampleIOT.csv contains). We consider a series of pairs of values; each pair consists of (timeStamp in ms,temperature recorded in degrees):
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05),(5,26.05)
Remember that we are using a sliding timeWindow(3,1) along with the ‘notion of time’ applicable being that of eventTime and instructing Flink to use the timeStamp of the pair as an ascending eventTime.
timeWindow sliding this way ->
timeStamp == 1
(1,20.5),(1,19.05),(1,21.05)
Flink prints:21.05
TimeStamp == 1,2
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05)
Flink prints: 21.05
TimeStamp == 1,2,3
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05)
First timeWindow created, Flink prints: 21.05
TimeStamp == 2,3,4
(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05)
Second timeWindow created (after slide), Flink prints 29.05
TimeStamp == 3,4,5
(3,9.05),(3,13.05)(4,28.05),(4,29.05),(5,26.05)
Third timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 4,5
(4,28.05),(4,29.05),(5,26.05)
Fourth timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 5
(5,26.05)
Fifth and final timeWindow created (after slide), Flink prints 26.05
This may not be the best representation of what really happens, but I hope that this provides sufficient visualization of the same! 🙂
As we observe, in a sliding window, events are grouped according to their timestamps – in this case, 3 milliseconds. Also, an event can belong to more than window because the window slides progressively, by 1 millisecond in this case. From our small example dataset above, pairs (2,*) and (3,*) belong to two successive timeWindows.
I will share findings of further explorations in the world of Flink, in my upcoming blogs.
I have used Flink 0.10.0 for the code shown above. It can be accessed here and here.
Many thanks to fantastic folks are not only are behind Flink, but also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concepts.
val readings =
readIncomingReadings(env,“./sampleIOT.csv”)
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3),Time.milliseconds(1))
.maxBy(1)
|
(1448028166,32.06)
(1448028166,32.06)
(1448028160,30.02)
(1448028167,26.37)
(1448028161,29.43)
(1448028160,30.02)
(1448028163,27.83)
(1448028166,32.06)
(1448028163,27.83)
(1448028160,30.02)
|
timeWindow sliding this way ->
|
|||||
timeStamp == 1
|
|||||
(1,20.5),(1,19.05),(1,21.05)
|
|||||
Flink prints:21.05
|
|||||
TimeStamp == 1,2
|
|||||
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05)
|
|||||
Flink prints: 21.05
|
|||||
TimeStamp == 1,2,3
|
|||||
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05)
|
|||||
First timeWindow created, Flink prints: 21.05
|
|||||
TimeStamp == 2,3,4
|
|||||
(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05)
|
|||||
Second timeWindow created (after slide), Flink prints 29.05
|
|||||
TimeStamp == 3,4,5
|
|||||
(3,9.05),(3,13.05)(4,28.05),(4,29.05),(5,26.05)
|
|||||
Third timeWindow created (after slide), Flink prints: 29.05
|
|||||
TimeStamp == 4,5
|
|||||
(4,28.05),(4,29.05),(5,26.05)
|
|||||
Fourth timeWindow created (after slide), Flink prints: 29.05
|
|||||
TimeStamp == 5
|
|||||
(5,26.05)
|
|||||
Fifth and final timeWindow created (after slide), Flink prints 26.05
|
I have used Flink 0.10.0 for the code shown above. It can be accessed here and here.
Many thanks to fantastic folks are not only are behind Flink, but also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concepts.
“At this point, I have not been able to figure out, how Flink decides the the order in which tuples are printed”
This is due to the way watermarks flow through operators. By default watermark is generated every 200(or 100) ms. In your toy example all windows are triggered together.
Yes, you are right. I should have edited it in time. I have been lazy. Thanks again.
I plan to take this up in a separate blog, later.