Flink: Implementing the Session window.

Reading Time: 3 minutes

In the previous blogs, we learned about Tumbling, Sliding, and Count windows in Flink. There is one another useful way to window the data which Flink offers i.e, Session window. So in this blog, we will explore the Session window in detail with an example.

In the real world, all the work that we do online- Visiting a website, Clicking around the website, do online transactions, and so on are in sessions. We might just go to an e-commerce website like amazon, looking for products, clicking around for a bit, and then stop. All is done within a session. There is a use case where these websites may want to track pages that we visited in a single session. For that, it needs to group all clicks together which are streaming in, based on a session. These streaming use cases can be implemented easily by Flink Session window.

The Session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. The number of entities within a session window is not fixed. Because it is a user who defines typically how long the session would be. A session window closes when it does not receive elements for a certain period of time, i.e., when a gap of inactivity occurred. For example, once we have been idle on the amazon website let say for 1 minute that is the end of the previous session and if go back to the site after 1 sec it will start a new session. The way it would determine the session is the pause between one click and another click.

Session window in Action

In the above action, the gaps between the entities determine when a window starts and when it ends. Above, one of the windows is very large because the gap there is not large enough to start the new session window and all the entities come within are considered to be part of the same session. In the session window, we specify what interval to choose so that a new session begins.

Session window example

Let’s write a simple Flink application using a session window to track the maximum time spent in minutes by a user on a page on a website in one session. In this case, every entity in the stream comes in has three bits of information- The user name, web page visited at some time, and how long be spent on the webpage in minutes.

public static void main(String[] args) {
    LOGGER.info("Session window example.");

    StreamExecutionEnvironment executionEnvironment =
            StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> text = executionEnvironment
            .socketTextStream("localhost", 9000, '\n', 6);

    DataStream<Tuple3<String,String, Double>> userClickStream = text.map(row -> {
        String[] fields = row.split(",");
        if (fields.length == 3) {
            return new Tuple3<>(
                    fields[0],
                    fields[1],
                    Double.parseDouble(fields[2])
            );
        }
        throw new Exception("Not valid arg passed");
    }, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>() {
    }));

    DataStream<Tuple3<String, String, Double>> maxPageVisitTime =
            userClickStream.keyBy(((KeySelector<Tuple3<String, String, Double>,
                    Tuple2<String, String>>) stringStringDoubleTuple3 ->
            new Tuple2<>(stringStringDoubleTuple3.f0, stringStringDoubleTuple3.f1)), 
                    TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
    }))
            .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
            .max(2);

    maxPageVisitTime.print();

    executionEnvironment.execute("Session window example.");
}

The above example is based on processing time and used processing time session window with a gap of 10 seconds. If the gap is greater than 10 seconds between the subsequent messages then the message which comes after will be the part of the next session. Messages which come within 10 seconds interval of one another will be part of the same session.

Let’s run this Flink application and see the behavior. Open the terminal and run below command to start a socket window:

  nc -l 9000

Then run Flink application and pass some messages within the socket window. Open a new terminal and run below command to see the output.

tail -f log/flink--taskexecutor-.out

Passes 4 messages on the netcat window and pause for 10 seconds. These four messages are part of one session. The output we get is the maximum time spent in minutes by a user on a particular webpage in one session.

You can find this sample Flink application here. Clone it, Run it, and play around it to see the behavior of Session window.

Happy Blogging and keep learning!!

1 thought on “Flink: Implementing the Session window.4 min read

Comments are closed.