Apache Beam: Pipeline Fundamentals

Reading Time: 3 minutes

An introduction to pipeline fundamentals.

What is Apache Beam

Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream processing. The Apache Beam programming model simplifies the mechanics of large-scale data processing.

What is Beam Pipeline

A Beam pipeline is a graph of all the data and computations in your data processing task. This includes reading input data, transforming that data, and writing output data. A pipeline is constructed by a user in their SDK of choice. Then, the pipeline makes its way to the runner either through the SDK directly or through the Runner API’s RPC interface. For example, this diagram shows a branching pipeline:

beam pipeline

What to consider when designing your pipeline

When designing your Beam pipeline, consider a few basic questions:

  • Where is your input data stored? How many sets of input data do you have? This will determine what kinds of Read transforms you’ll need to apply at the start of your pipeline.
  • What does your data look like? It might be plaintext, formatted log files, or rows in a database table. Some Beam transforms work exclusively on PCollections of key/value pairs; you’ll need to determine if and how your data is keyed and how to best represent that in your pipeline’s PCollection(s).
  • What do you want to do with your data? The core transforms in the Beam SDKs are general purpose. Knowing how you need to change or manipulate your data will determine how you build core transforms like ParDo, or when you use pre-written transforms included with the Beam SDKs.
  • What does your output data look like, and where should it go? This will determine what kinds of Write transforms you’ll need to apply at the end of your pipeline.

A basic pipeline

beam pipeline

However, your pipeline can be significantly more complex. A pipeline represents a Directed Acyclic Graph of steps. It can have multiple input sources, multiple output sinks, and its operations (PTransforms) can both read and output multiple PCollections.

Branching PCollections

It’s important to understand that transforms do not consume PCollections; instead, they consider each individual element of a PCollection and create a new PCollection as output. This way, you can do different things to different elements in the same PCollection.

There are two ways to branch a pipeline: You can use the same PCollection as input for multiple transforms without consuming the input or altering it. Another way to branch a pipeline is to have a single transform output to multiple PCollections by using tagged outputs.

Example

The following example code applies two transforms to a single input collection.

PCollection<String> dbRowCollection = ...;

PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));

PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

The following example code applies one transform that processes each element once and outputs two collections.

final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};

PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));

// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);

// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

References

https://beam.apache.org/documentation/pipelines/design-your-pipeline/

https://www.macrometa.com/event-stream-processing/apache-beam-tutorial#Section-6

Leave a Reply