Apache Beam ParDo Transformations

Reading Time: 2 minutes

What is a PCollection?

PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection form of in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.

What is ParDo?

ParDo is a general-purpose transform for parallel processing. It is quite flexible and allows you to perform everyday data processing tasks. Unlike MapElements transform where it produces exactly one output for each input element of a collection, ParDo gives us a lot of flexibility so that we can return zero or more output for each input element in a collection.

Example

ParDo transforms to produce a collection where only the words with a count > 100 are present. The words with a smaller frequency will be discarded.

public static void main(String args[]) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from("./words.txt"))
        .apply(new CountWords())
        .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
            @ProcessElement
            public void processElement(@Element KV<String, Long> input, OutputReceiver<KV<String, Long>> outputReceiver) {
                if(input.getValue() > 100) {
                    outputReceiver.output(input);
                }
            }
        }))
        .apply(MapElements.into(TypeDescriptor.of(String.class)).via(kv -> String.format("%s: %d", kv.getKey(), kv.getValue())))
        .apply(TextIO.write().to("output"))
    ;
    // run the pipeline and wait until the whole pipeline is finished
    pipeline.run().waitUntilFinish();
}

We added a ParDo transform to discard words with counts <= 100. To apply a ParDo, we need to provide the user code in the form of DoFn. A DoFn should specify the type of input element and type of output element. In this case, both input and output have the same type.

Our user code will go inside a function annotated with @ProcessElement. A function annotated with @ProcessElement will be executed for each element in the input collection. To define what the input element is, we annotate the parameter with @Element. To emit an output, we also need to specify an OutputReceiver. We can discard it if we don’t plan to return any values e.g. when we just plan to write the elements to a database. This is not the only signature for a function annotated with @ProcessElement. Later on, we’ll see how to produce multiple PCollections as output and how to accept side inputs (additional input other than the element currently being processed).

Conclusion

  • calls any user-specified function
  • its processing method is applied to each element of the dataset, one by one
  • if different resources are allocated, the dataset’s elements can be processed in parallel
  • processed elements keep their original timestamp and window
  • no global mutable state – it’s not possible to share some mutable state among executed functions. In fact, they are serialized and sent as so to the workers. So even if they reference some global variables (as collections), the workers will receive only the copy of these variables and not the variables themselves
  • the ParDo transformation is fault-tolerant, i.e. if it crashes, it’s rerun.

Written by 

Sumit Agarwal is a Software Consultant having more than 1+ years of experience. He is good at problem-solving skills. He likes to watch football and cricket. He has mainly experience in Java and Mysql.