Introduction to Apache Beam

Reading Time: 4 minutes

Introduction

Apache Beam (Batch + strEAM) is a unified programming model for batch and streaming data processing jobs. It provides a software development kit to define and construct data processing pipelines as well as runners to execute them.

Apache Beam is designed to provide a portable programming layer. In fact, the Beam Pipeline Runners translate the data processing pipeline into the API compatible with the backend of the user’s choice. Currently, these distributed processing backends are supported:

  • Apache Apex
  • Hazelcast Jet
  • Apache Gearpump (incubating)
  • Apache Samza
  • Google Cloud Dataflow
  • Apache Spark
  • Apache Flink

Why Apache Beam?

Apache Beam fuses batch and streaming data processing, while others often do so via separate APIs. Consequently, it’s very easy to change a streaming process to a batch process and vice versa, say, as requirements change.

Apache Beam raises portability and flexibility. We focus on our logic rather than the underlying details. Moreover, we can change the data processing backend at any time.

Fundamental Concepts

With Apache Beam, we can construct workflow graphs (pipelines) and execute them. The key concepts in the programming model are:

  • PCollection – A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.
  • PTransform – A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.
  • Pipeline – A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.
  • PipelineRunner – executes a Pipeline on a specified distributed processing backend

Simply put, a PipelineRunner executes a Pipeline, and a Pipeline consists of PCollection and PTransform.

Word Count Example

Now that we’ve learned the basic concepts of Apache Beam, let’s design and test a word count task.

Constructing a Beam Pipeline

Designing the workflow graph is the first step in every Apache Beam job. Let’s define the steps of a word count task:

  1. Read the text from a source.
  2. Split the text into a list of words.
  3. Lowercase all words.
  4. Trim punctuations.
  5. Filter stopwords.
  6. Count each unique word.

To achieve this, we’ll need to convert the above steps into a single Pipeline using PCollection and PTransform abstractions.

Dependencies

Before we can implement our workflow graph, we should add Apache Beam’s core dependency to our project:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
</dependency>

Beam Pipeline Runners rely on a distributed processing backend to perform tasks. Let’s add DirectRunner as a runtime dependency:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
</dependency>

Unlike other Pipeline Runners, DirectRunner doesn’t need any additional setup, which makes it a good choice for starters.

Implementation

Apache Beam utilizes the Map-Reduce programming paradigm (same as Java Streams). In fact, it’s a good idea to have a basic concept of reduce(), filter(), count(), map(), and flatmap() before we continue.

Creating a Pipeline is the first thing we do.

The Pipeline abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a Pipeline object, and then using that object as the basis for creating the pipeline’s data sets as PCollections and its operations as Transforms.

To use Beam, your driver program must first create an instance of the Beam SDK class Pipeline (typically in the main() function). When you create your Pipeline, you’ll also need to set some configuration options. You can set your pipeline’s configuration options programmatically, but it’s often easier to set the options ahead of time (or read them from the command line) and pass them to the Pipeline object when you create the object.

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Now we apply our six-step word count task:

PCollection<KV<String, Long>> wordCount = p
    .apply("(1) Read all lines", 
      TextIO.read().from(inputFilePath))
    .apply("(2) Flatmap to a list of words", 
      FlatMapElements.into(TypeDescriptors.strings())
      .via(line -> Arrays.asList(line.split("\\s"))))
    .apply("(3) Lowercase all", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> word.toLowerCase()))
    .apply("(4) Trim punctuations", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> trim(word)))
    .apply("(5) Filter stopwords", 
      Filter.by(word -> !isStopWord(word)))
    .apply("(6) Count words", 
      Count.perElement());

The first (optional) argument of apply() is a String that is only for better readability of the code. Here is what each apply() does in the above code:

  1. First, we read an input text file line by line using TextIO.
  2. Splitting each line by whitespaces, we flat-map it to a list of words.
  3. Word count is case-insensitive, so we lowercase all words.
  4. Earlier, we split lines by whitespace, ending up with words like “word!” and “word?”, so we remove punctuations.
  5. Stopwords such as “is” and “by” are frequent in almost every English text, so we remove them.
  6. Finally, we count unique words using the built-in function Count.perElement().

As mentioned earlier, pipelines are processed on a distributed backend. It’s not possible to iterate over a PCollection in-memory since it’s distributed across multiple backends. Instead, we write the results to an external database or file.

First, we convert our PCollection to String. Then, we use TextIO to write the output:

wordCount.apply(MapElements.into(TypeDescriptors.strings())
    .via(count -> count.getKey() + " --> " + count.getValue()))
    .apply(TextIO.write().to(outputFilePath));

Now that our Pipeline definition is complete, we can run and test it.

Running and Testing

So far, we’ve defined a Pipeline for the word count task. At this point, let’s run the Pipeline:

p.run().waitUntilFinish();

On this line of code, Apache Beam will send our task to multiple DirectRunner instances. Consequently, several output files will be generated at the end. They’ll contain things like:

...
apache --> 3
beam --> 5
rocks --> 2
...

Conclusion

In this blog, we learned what Apache Beam is and why it’s preferred over alternatives. We also demonstrated basic concepts of Apache Beam with a word count example.

Reference Link :- https://beam.apache.org/

Written by 

KRISHNA JAISWAL is Software Consultant Trainee at Knoldus. He is passionate about JAVA , MYSQL , having knowledge of C , C++ and much more. He is recognised as a good team player, a dedicated and responsible professional, and a technology enthusiast. He is a quick learner & curious to learn new technologies. His hobbies include reading Books , listening Music and playing Cricket .