Flink: Union operator on Multiple Streams

Reading Time: 3 minutes

Apache Flink offers rich sources of API and operators which makes Flink application developers productive in terms of dealing with the multiple data streams. Flink provides many multi streams operations like Union, Join, and so on. In this blog, we will explore the Union operator in Flink that can combine two or more data streams together.

We know in real-time we can have multiple data streams from different sources and applying transformations on them separately. But might be we want to perform the same kind of transformation on the elements of the streams. So definitely it will be good practice to bring them together to be a single stream and then apply transformations.

In Flink, these data streams can be combined together in a single stream using the union operation. The resulting stream has entities from all of the original streams that we including during union operation.

Examples

Let’s write a simple Flink application for Union operation. Let say we have two data streams as our sources. Both the sources are from netcat utility run on different ports 9000 and 9009. If either of the streams is null simply exiting the streaming application. We can’t proceed further. The union operation combines these two streams together into a single stream.

public static void main(String[] args) {
    final StreamExecutionEnvironment executionEnvironment =
            StreamExecutionEnvironment.getExecutionEnvironment();

    final DataStream<String> stream1 = executionEnvironment
            .socketTextStream("localhost", 9000, '\n', 6);

    final DataStream<String> stream2 = executionEnvironment
            .socketTextStream("localhost", 9009, '\n', 6);
    if(stream1 == null || stream2 == null) {
                 System.exit(1);
                 return;
      }
    ArrayList<DataStream<String>> dataStreams= new ArrayList<>();
    dataStreams.add(stream2);
    Seq<DataStream<String>> dataStreamSeq = JavaConverters
            .asScalaIteratorConverter(dataStreams.iterator())
            .asScala().toSeq();
    DataStream<String> union = stream1.union(dataStreamSeq);
    union.print();

    executionEnvironment.execute("Flink Union Example");
}

In the above code snippet stream1 union stream2 gives as the resulting union of the stream.

Note: You cant pass data stream in union method directly because it takes Seq<DataStream<T>> as parameter.

To run the application open two socket terminal one with port 9000 and another with port 9009. Streaming application is going to listen these ports.

nc -l 9000
nc -l 9009

Now run the flink application and also tail the log to see the output.

tail -f log/flink--taskexecutor-.out

As we enter messages in either of these two netcat windows both of them combine together to form a single stream.

We can also run the union operation on a stream with itself. Sometime there might be a use case where we want to process entities of stream twice. For that, we can union that stream with itself.

DataStream<String> union = stream1.union(stream1);

Union operation in Flink can be run on more than two stream at a time.

DataStream<String> union = stream1.union(stream1, stream2);

Happy Blogging and keep Learning!!

Leave a Reply