How to implement Data Pipelines with the help of Beam

Reading Time: 4 minutes

Throughout this blog, I will provide a deeper look into this specific data processing model and explore its data pipeline structures and how to process them.

Apache Beam

Apache Beam is one of the latest projects from Apache, a consolidated programming model for expressing efficient data processing pipelines. It is an open-source, unified model for defining both batches- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. Using one of the Apache Beam SDKs, you build a program that defines the pipeline.



pip install apache-beam

Google Cloud Platform

pip install apache-beam[gcp]

Amazon Web Server

pip install apache-beam[aws]

What is Data Pipeline

It encapsulates the information handling task by changing the input.

A Beam program often starts by creating an Pipeline object.

In the Beam SDKs, each pipeline is represented by an explicit object of type Pipeline. Each Pipeline the object is an independent entity that encapsulates both the data the pipeline operates over and the transforms that get applied to that data.

To create a pipeline, declare a Pipeline object, and pass it some configuration options.

// Start by defining the options for the pipeline.

PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.

Pipeline p = Pipeline.create(options);

Features of Apache Beam

Apache Beam comprises four basic features:

  • Pipeline
  • P-Collection
  • P-Transform
  • Runner

A Pipeline is responsible for reading, processing, and saving the data. This whole cycle is a pipeline starting from the input until its entire circle to the output. Although, every Beam program is capable of generating a Pipeline.

The second feature of Beam is a Runner. It determines where this pipeline will operate.

The third feature of Beam is P-Collection that it is equivalent to RDD or DataFrames in Spark also. The pipeline creates a PCollection by reading data from a data source, and after that, more PCollections keep on developing as PTransforms are applied to it.

Each P-transform in PCollection results in a new PCollection making it immutable. Once constructed, you will not be able to configure individual items in a PCollection. A transformation in PCollection will result in a new PCollection. The features in a PCollection can be of any type, but all must be of the same kind. However, to maintain disseminated processing, Beam encodes each element as a byte string so that Beam can pass around items to distributed workers.

The Beam SDK packages also serve as an encoding mechanism for used types with support for custom encodings. In addition, PCollection does not support grained operations. For this reason, we cannot apply transformations on some specific items in a PCollection.We use all the conversions to apply to the whole of the PCollection and not some aspects.

Timestamp in PCollection

Commonly, the source often assigns a timestamp to each new element when the item’s read or added. If PCollection holds bounded data, we may highlight that every feature will be set to the same timestamp. You can specify the timestamp explicitly, or Beam will provide its own. In any of the cases, we can manually assign timestamps to the elements if the source does not do it for us.

The fourth feature of Beam is P-Transform. It takes a sample PCollection as the data source and produces an identical PCollection with timestamps attached. They operate in parallel while conducting operations like windowing, assigning watermarks, etc.

Pipeline Structure of Beam

In this section, I will be implementing the pipeline structure of Beam. The first step starts with `assigning pipeline a name`, a mandatory line of code.

pipeline1 = beam.Pipeline()

The second step is to `create` initial PCollection by reading any file, stream, or database.

dept_count = ( 




According to our use case, the third step is to `apply` P-Transforms. We can use several transforms in this pipeline. A pipe operator will apply Each of the transforms.

dept_count = (



 |beam.Map(lambda line: line.split(‘,’))

 |beam.Filter(lambda line: line[3] == ‘Backend’) 

 |beam.Map(lambda line: (line[1], 1))



To request a transform operation, you need to implement it to the input PCollection. For every transform, there exists a nonproprietary apply method. However, we can use the apply operation either with `.apply` or a ` | ` pipe operator.

After all transforms, the fourth step is to write the final PCollection to an external source. It can be a file, database, or stream.

dept_count = (



 |beam.Map(lambda line: line.split(‘,’))

 |beam.Filter(lambda line: line[3] == ‘Backend’) 

 |beam.Map(lambda line: (line[1], 1))



The final step is to run the pipeline.

Pipeline Branch Operations

Most of the pipelines simply represent a linear flow of operations with one-to-one mapping. After the first PCollection, one filter operation produces one new PCollection. On that PCollection, one map transforms to create additional PCollection in the queue until it is written to a file.

However, for most of the use cases, our pipeline can be significantly complex and branched. This type of pipeline is called a branched pipeline in Beam, where we can use the same PCollection as input for multiple transforms.

Structure of Streaming Data Pipeline in Beam

The core idea of Beam is to provide consolidated big data processing pipelines. Its harmonious nature builds the batch and streaming pipelines with a single API as stated in its official documentation.

When we create a Pipeline, we can also set some configuration options associated with it, such as the pipeline runner, which will execute our pipeline and any runner-specific configuration required by the chosen runner.

We can consider assigning the pipeline’s configuration preferences by hardcoding it. Still, it is often advisable to have them read from the command line and then pass it to the Pipeline object. For this reason, if we can build a pipeline that takes the runner information, those input-output file paths information from the command line, then our problem is solved, and we can say we will obtain a generic pipeline.

import apache_beam as beam

import argparse

from apache_beam.options.pipeline_options import 

parser = argparse.ArgumentParser() 
parser.add_argument(‘ — input’, dest=’input’,

required=True,help=’/content/data.txt/’)parser.add_argument(‘ —

output’,dest=’input’,required=True, help=’/content/output.txt/’)
path_args, pipeline_args = parser.parse_known_args() 
input_arguments = path_args.input 

output_arguments = path_args.output 
options = PipelineOptions(pipeline_args)

pipeline_with_options = beam.Pipeline(options = options)
dept_count = (pipeline_with_options


 |beam.Map(lambda line: line.split(‘,’))

 |beam.Filter(lambda line: line[3] == ‘AI’) 

 |beam.Map(lambda line: (line[1], 1))




Written by 

He is a Software Consultant at Knoldus Inc. He has done B.Tech from Dr. APJ Kalam Technical University Uttar-Pradesh. He is passionate about his work and having the knowledge of various programming languages like Java, C++, Python. But he is passionate about Java development and curious to learn Java Technologies. He is always impatient and enthusiastic to learn new things. He is good skills of Critical thinking and problem solving and always enjoy to help others. He likes to play outdoor games like Football, Volleyball, Hockey and Kabaddi. Apart from the technology he likes to read scriptures originating in ancient India like Veda,Upanishad,Geeta etc.

Leave a Reply