Introduction to Apache Beam

Reading Time: 3 minutes

What is Apache Beam?

Apache Beam is an open source and centralised model for describing parallel-processing pipelines for both batch and streaming data. Therefore programming model of the Apache Beam simplifies large-scale data processing dynamics.

The Apache Beam model offers helpful abstractions that insulate you from distributed processing information at low levels such as managing individual staff, exchanging databases, and other activities. So these low-level information are handled entirely by Dataflow.

Features of Apache Beam

The unique features of Apache beam are as follows:

  1. Unified – Firstly use a single programming model for batch and streaming use cases.
  2. Portable – Execute pipelines in multiple execution environments.So, execution environments mean different runners. Ex. Spark Runner, Dataflow Runner, etc
  3. Moreover- Extensible Write custom SDKs, IO connectors, and transformation libraries.

Apache Beam SDKs and Runners

Similarly as of today, there are 3 Apache beam programming SDKs

  1. Java
  2. Python
  3. Golang

Beam Runners translate the beam pipeline to the API compatible backend processing of your choice. Beam currently supports runners that work with the following backends.

  1. Apache Spark
  2. Google Cloud Dataflow
  3. Apache Flink
  4. Hazelcast Jet
  5. Apache Samza
  6. Twister2

Direct Runner to run on the host machine, which is used for testing purposes.

Basic Concepts in Apache Beam

Likewise Apache Beam has three main abstractions. They are

  1. Pipeline
  2. PCollection
  3. PTransfor

Pipeline:

The created first abstraction is a Pipeline. So,It holds the complete data processing job from start to finish and it including reading data, manipulating data, and writing data to a sink. Therefore every pipeline takes in options/parameters that indicate where and how to run.

PCollection:

A pcollection is an abstraction of distributed data. A pcollection can be bounded, i.e. finite data, or unbounded, i.e., infinite data. The initial pcollection is created by reading data from the source. From then on, pcollections are the source and sink of every step in the pipeline.

Transform:

A transform is a data processing operation. It applied on one or more pcollections. Therefore Complex transforms have other transform nested within them. Every transform has a generic apply method where the logic of the transform sits in.

Example of Pipeline

Here, let’s write a pipeline to output all the jsons where the name starts with a vowel.

Let’s take a sample input. Name the file as input.json

{"name":"deepak", "score":12}
{"name":"virat", "score":23}
{"name":"dhoni", "score":45}
{"name":"rahul", "score": 156}
{"name": "rohit"}
{"name": "shikhar"}

The input like as newline delimited JSON.

Here Includes the following dependencies in your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.24.0</version>
</dependency>

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.24.0</version>
</dependency>

Let’s code the beam pipeline. Follow the steps-

1.Create pipeline
Pipeline pipeLine = Pipeline.create();
// OR 
// Pipeline pipeLine = Pipeline.create(options)


Create a pipeline which binds all the pcollections and transforms. Optionally you can pass the PipelineOptions options if needed.

2.Read the input lines
PCollection<String> inputCollection = pipeLine.apply("Read My File", TextIO.read().from("input.json"));

Use the TextIO transform to read the input files. Every line is a different json record.

3.Apply a transform to filter out the names starting from a vowel
PCollection filteredCollection = inputCollection.apply("Filter names starting with vowels", Filter.by(new SerializableFunction<String, Boolean>() {

        public Boolean apply(String input) {
            ObjectMapper jacksonObjMapper = new ObjectMapper();
            try {
                JsonNode jsonNode = jacksonObjMapper.readTree(input);
                String name = jsonNode.get("name").textValue();
                return vowels.contains(name.substring(0,1).toLowerCase());
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return false;
        }
    }))

The filter transform takes a SerializableFunction Object. It takes from the overridden apply method. Every json-string record is converted to a JSON. If there is a vowel then first character of the name will checked. The transform is apply to each input JSON record. Based on the returned boolean value, the record is retain or discarded.

4.Write the results to a file
inputCollection.apply("write to file", TextIO.write().to("result").withSuffix(".txt").withoutSharding());

The results of the filter transform are stored in a text file using the write method of the TextIO transform.Across machines are distributed, the results are written to multiple files/shards. To avoid this, we use withoutSharding where all the output written to a single file.

Output:
{"name": "rohit"}
{"name": "shikhar"}
{"name":"deepak", "score":12}
Complete Code:
Pipeline pipeLine = Pipeline.create();

final Set<String> vowels = new HashSet<String>(Arrays.asList("a","e","i","o","u"));

pipeLine.apply("Read My File",
                TextIO.read().from("input.json"))
        .apply("Filter names starting with vowels", Filter.by(new SerializableFunction<String, Boolean>() {

            public Boolean apply(String input) {
                ObjectMapper jacksonObjMapper = new ObjectMapper();
                try {
                    JsonNode jsonNode = jacksonObjMapper.readTree(input);
                    String name = jsonNode.get("name").textValue();
                    return vowels.contains(name.substring(0,1).toLowerCase());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return false;
            }
        }))
        .apply("write to file", TextIO.write().to("result").withSuffix(".txt").withoutSharding());

pipeLine.run().waitUntilFinish();

Written by 

Deepak kumar is a Software Intern at Knoldus Inc. He has done Post Graduation from Ajay Kumar Garg Engineering College Ghaziabad. He has the knowledge of various programming languages. He is passionate about Java development and curious to learn Java Technologies. He is a quick learner, problem solver and always enjoy to help others. His hobbies playing Cricket and watching Hollywood movies.