Throughout this blog, I will provide a deeper look into this specific data processing model and explore its data pipeline structures and how to process them.
Apache Beam is one of the latest projects from Apache, a consolidated programming model for expressing efficient data processing pipelines. It is an open-source, unified model for defining both batches- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. Using one of the Apache Beam SDKs, you build a program that defines the pipeline.
pip install apache-beam
Google Cloud Platform
pip install apache-beam[gcp]
Amazon Web Server
pip install apache-beam[aws]
What is Data Pipeline
It encapsulates the information handling task by changing the input.
A Beam program often starts by creating an
In the Beam SDKs, each pipeline is represented by an explicit object of type
Pipeline. Each P
ipeline the object is an independent entity that encapsulates both the data the pipeline operates over and the transforms that get applied to that data.
To create a pipeline, declare a
Pipeline object, and pass it some configuration options.
// Start by defining the options for the pipeline. PipelineOptions options = PipelineOptionsFactory.create(); // Then create the pipeline. Pipeline p = Pipeline.create(options);
Features of Apache Beam
Apache Beam comprises four basic features:
Pipeline is responsible for reading, processing, and saving the data. This whole cycle is a pipeline starting from the input until its entire circle to the output. Although, every Beam program is capable of generating a Pipeline.
The second feature of Beam is a
Runner. It determines where this pipeline will operate.
The third feature of Beam is
P-Collection that it is equivalent to RDD or DataFrames in Spark also. The pipeline creates a
PCollection by reading data from a data source, and after that, more PCollections keep on developing as PTransforms are applied to it.
PCollection results in a new
PCollection making it immutable. Once constructed, you will not be able to configure individual items in a PCollection. A transformation in
PCollection will result in a new
PCollection. The features in a
PCollection can be of any type, but all must be of the same kind. However, to maintain disseminated processing, Beam encodes each element as a byte string so that Beam can pass around items to distributed workers.
The Beam SDK packages also serve as an encoding mechanism for used types with support for custom encodings. In addition, PCollection does not support grained operations. For this reason, we cannot apply transformations on some specific items in a PCollection.We use all the conversions to apply to the whole of the
PCollection and not some aspects.
Timestamp in PCollection
Commonly, the source often assigns a timestamp to each new element when the item’s read or added. If
PCollection holds bounded data, we may highlight that every feature will be set to the same timestamp. You can specify the timestamp explicitly, or Beam will provide its own. In any of the cases, we can manually assign timestamps to the elements if the source does not do it for us.
The fourth feature of Beam is
P-Transform. It takes a sample
PCollection as the data source and produces an identical PCollection with timestamps attached. They operate in parallel while conducting operations like windowing, assigning watermarks, etc.
Pipeline Structure of Beam
In this section, I will be implementing the pipeline structure of Beam. The first step starts with `assigning pipeline a name`, a mandatory line of code.
pipeline1 = beam.Pipeline()
The second step is to `create` initial
PCollection by reading any file, stream, or database.
dept_count = ( pipeline1 |beam.io.ReadFromText(‘/content/input_data.txt’) )
According to our use case, the third step is to `apply`
P-Transforms. We can use several transforms in this pipeline. A pipe operator will apply Each of the transforms.
dept_count = ( pipeline1 |beam.io.ReadFromText(‘/content/input_data.txt’) |beam.Map(lambda line: line.split(‘,’)) |beam.Filter(lambda line: line == ‘Backend’) |beam.Map(lambda line: (line, 1)) |beam.CombinePerKey(sum) )
To request a transform operation, you need to implement it to the input
PCollection. For every transform, there exists a nonproprietary apply method. However, we can use the apply operation either with `.apply` or a ` | ` pipe operator.
After all transforms, the fourth step is to write the final
PCollection to an external source. It can be a file, database, or stream.
dept_count = ( pipeline1 |beam.io.ReadFromText(‘/content/input_data.txt’) |beam.Map(lambda line: line.split(‘,’)) |beam.Filter(lambda line: line == ‘Backend’) |beam.Map(lambda line: (line, 1)) |beam.CombinePerKey(sum) |beam.io.WriteToText(‘/content/output_data.txt’) )
The final step is to run the pipeline.
Pipeline Branch Operations
Most of the pipelines simply represent a linear flow of operations with one-to-one mapping. After the first PCollection, one filter operation produces one new PCollection. On that PCollection, one map transforms to create additional PCollection in the queue until it is written to a file.
However, for most of the use cases, our pipeline can be significantly complex and branched. This type of pipeline is called a branched pipeline in Beam, where we can use the same PCollection as input for multiple transforms.
Structure of Streaming Data Pipeline in Beam
The core idea of Beam is to provide consolidated big data processing pipelines. Its harmonious nature builds the batch and streaming pipelines with a single API as stated in its official documentation.
When we create a
Pipeline, we can also set some configuration options associated with it, such as the pipeline runner, which will execute our pipeline and any runner-specific configuration required by the chosen runner.
We can consider assigning the pipeline’s configuration preferences by hardcoding it. Still, it is often advisable to have them read from the command line and then pass it to the
Pipeline object. For this reason, if we can build a pipeline that takes the runner information, those input-output file paths information from the command line, then our problem is solved, and we can say we will obtain a generic pipeline.
import apache_beam as beam import argparse from apache_beam.options.pipeline_options import PipelineOptions,StandardOptions parser = argparse.ArgumentParser() parser.add_argument(‘ — input’, dest=’input’, required=True,help=’/content/data.txt/’)parser.add_argument(‘ — output’,dest=’input’,required=True, help=’/content/output.txt/’) path_args, pipeline_args = parser.parse_known_args() input_arguments = path_args.input output_arguments = path_args.output options = PipelineOptions(pipeline_args) pipeline_with_options = beam.Pipeline(options = options) dept_count = (pipeline_with_options |beam.io.ReadFromText(input_arguments) |beam.Map(lambda line: line.split(‘,’)) |beam.Filter(lambda line: line == ‘AI’) |beam.Map(lambda line: (line, 1)) |beam.io.WriteToText(output_arguments) ) pipeline_with_options.run()