Apache Beam Transformation

Reading Time: 5 minutes

In this blog, we are going to see the various ways of transformation in the apache beam. Transforms are the operations in your pipeline and provide a generic processing framework. As we know Apache Beam is an open-source, unified model for defining both batch and streaming data-parallel processing pipelines. So generally we refer to the Transform as the PTransfarm. Here P stands for Parallel.

PTransform :-

A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input perform a processing function that you provide on the elements of that PCollection and produces zero or more output PCollection objects.

transformation in apache beam

Core Beam transformation

Beam provides the following core transformation, each of which represents a different processing paradigm:

  • ParDo
  • GroupByKey
  • CoGroupByKey
  • Combine
  • Flatten
  • Partition

ParDo:-

ParDo is a core element-wise transform in apache beam. Invoking a user in specified functions on each of the elements of the input collection to produce zero or more output elements.

Therefore ParDo is useful for a variety of common data processing operations, like

  • Filtering a data set
  • Formatting or type-converting each element in a data set
  • Extracting parts of each element in a data set
  • Performing computations on each element in a data set

Now Let’s understand with an example. Here we have a pardo_cust.csv file and we are filtering that customer who lives in Delhi.

//here is data of input file for reference
/*
id,name,lastName,city
101,Aditya,Rai,Delhi
102,Akash,Tewatiya,Noida
103,Praksh,singh,Mumbai
104,Rohit,Mishra,Noida
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

class CustFilter extends DoFn<String,String>{
   @ProcessElement
   public void processElement(ProcessContext c){
        String line = c.element();
        String[] arr = line.split(",");
        if(arr[3].equals("Delhi")) {
            c.output(line);
        }
    }
}
public class ParDoExample {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        PCollection<String> pCollection = pipeline.apply(TextIO.read().from("src/main/resources/pardo_cust.csv"));
        //using ParDo
        PCollection<String> pOutput = pCollection.apply(ParDo.of(new CustFilter()));
        pOutput.apply(TextIO.write().to("src/main/resources/pardo_cust_out.csv").withHeader("id , name , lastName , city").withNumShards(1).withSuffix(".csv"));
        pipeline.run();
    }
}
// and here is data of output file we got
/*
id , name , lastName , city
101,Aditya,Rai,Delhi
*/

Group By Key:-

GroupByKey is a good way to aggregate data that has something in common. It can group all the values associated with a particular key.So If you are using unbounded PCollections, you must use either non global windowing or an aggregation trigger in order to perform a GroupByKey or CoGroupByKey. This is because a bounded GroupByKey or CoGroupByKey must wait for all the data with a certain key to be collected, but with unbounded collections, the data is unlimited.

/*
here is data of input file
cId,oId,pId,amount
1001,1,1,300
1002,1,1,400
1001,3,2,500
1003,4,3,200
1001,2,1,150
1002,2,4,900
1003,2,5,550
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
class StringToKV extends DoFn<String,KV<String,Integer>>{
    @ProcessElement
    public void processElement(ProcessContext c){
        String input = c.element();
        String arr[] = input.split(",");
        c.output(KV.of(arr[0],Integer.valueOf(arr[3])));
    }
}
class KVToString extends DoFn<KV<String,Iterable<Integer>>,String>{
    @ProcessElement
    public void processElement(ProcessContext c){
        String strKey = c.element().getKey();
        Iterable<Integer> values = c.element().getValue();
        Integer sum =0;
        for (Integer integer:values){
            sum+=integer;
        }
        c.output(strKey+","+sum.toString());
    }
}
public class GroupByKeyEx {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        PCollection<String> pCollection = pipeline.apply(TextIO.read().from("src/main/resources/GroupByKey_data.csv"));
        // convert String to Key Value Pair
        PCollection<KV<String,Integer>> pOutput = pCollection.apply(ParDo.of(new StringToKV()));
        //Applying Group By key
        PCollection<KV<String,Iterable<Integer>>> kvpCollection = pOutput.apply(GroupByKey.<String,Integer>create());
        //converting kvpCollection value in string
       PCollection<String> output = kvpCollection.apply(ParDo.of(new KVToString()));
       output.apply(TextIO.write().to("src/main/resources/GroupByKey_out_data.csv").withHeader("cId,amount").withNumShards(1).withSuffix(".csv"));
        pipeline.run();
    }
}
/*
Here is generated output
cId,amount
1003,750
1002,1300
1001,950
*/

Co Group By Key:-

CoGroupByKey performs a relational join of two or more key/value PCollections that have the same key type. For example, let’s say you have two different files with user data: one file has userId, orderId, productId, amount. The other file has userName and userId. You can join those two data sets, using the userId as a common key and the other data as the associated values. After the join, you have one data set that contains all of the information associated with userId.

/*
Here is data of first input file
1001,1,1,300
1002,1,1,400
1003,4,3,200
1005,2,5,350
Here is data of first input file
1001,John
1002,Steve
1003,Ajay
1004,Swati
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

class OrderParsing extends DoFn<String,KV<String,String>>{
    @ProcessElement
    public void processElement(ProcessContext c){
        String arr[] = c.element().split(",");
        String strKey=arr[0];
        String strVal =arr[1]+","+arr[2]+","+arr[3];
        c.output(KV.of(strKey,strVal));
    }
}
class UserParsing extends DoFn<String,KV<String,String>>{
    @ProcessElement
    public void processElement(ProcessContext c){
        String arr[] = c.element().split(",");
        String strKey=arr[0];
        String strVal =arr[1];
        c.output(KV.of(strKey,strVal));
    }
}
public class CoGroupByEx {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        // Converting String to KV Object
        PCollection<KV<String,String>> pOrderCollection = pipeline.apply(TextIO.read().from("src/main/resources/user_order.csv")).apply(ParDo.of(new OrderParsing()));
        PCollection<KV<String,String>> pUserCollection = pipeline.apply(TextIO.read().from("src/main/resources/p_user.csv")).apply(ParDo.of(new UserParsing()));
        //Creating TupleTag Object
        final TupleTag<String> orderTuple = new TupleTag<String>();
        final TupleTag<String> userTuple = new TupleTag<String>();
        //combining datasets using CoGroupByKey
       PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple.of(orderTuple,pOrderCollection).and(userTuple,pUserCollection).apply(CoGroupByKey.<String>create());
       //iterating CoGbkResult and build String
        PCollection<String> output = result.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
            @ProcessElement
            public void processElement(ProcessContext c){
                String strKey = c.element().getKey();
                CoGbkResult valObject = c.element().getValue();
                Iterable<String> orderTable = valObject.getAll(orderTuple);
                Iterable<String> userTable = valObject.getAll(userTuple);
                for (String order:orderTable){
                    for (String user : userTable){
                        c.output(strKey+","+order+","+user);
                    }
                }
            }
        }));
        //step 5: save the result
        output.apply(TextIO.write().to("src/main/resources/CoGroupByKey.csv").withNumShards(1).withSuffix(".csv"));
        pipeline.run();
    }
}
/*
Here is output data as we have applied inner join
1003,4,3,200,Ajay
1001,1,1,300,John
1002,1,1,400,Steve
*/

Combine:-

Combine is a Beam transform for combining collections of elements or values in your data. It have variants that work on entire PCollections.Let’s see a simple example to understand how combine works.

import org.apache.beam.sdk.transforms.SerializableFunction;
import java.util.ArrayList;
import java.util.List;

class CombineExample implements SerializableFunction<Iterable<Integer>, Integer> {
    @Override
    public Integer apply(Iterable<Integer> input) {
        int sum = 0;
        for (int item : input) {
            sum += item;
        }
        return sum;
    }
}
public class SumInts {
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        list.add(10);
        list.add(20);
        list.add(30);
        CombineExample combineExample = new CombineExample();
        int result = combineExample.apply(list);
        System.out.println(result);
    }
}
// output = 60

Flatten:-

Flatten is a Beam transform for PCollection objects that store the same data type. Flatten merges multiple PCollection objects into a single logical PCollection so according to defenition flatten combines multiple Pcollection into single Pcollection Object.

/*
Here is all 3 input files which we are going to combine
file-1
1001,James,Butt,Los Angeles
1002,Art,Venere,Los Angeles
file-2
1003,Steve,Jobs,Phoenix
1004,James,Hopes,Los Angeles
file-3
1005,Ajay,Sharma,Phoenix
1006,Amit,Wilson,Los Angeles
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;

public class FlattenExample {
    public static void main(String[] args) {
        Pipeline pipeline =Pipeline.create();
        PCollection<String> pCustList1 = pipeline.apply(TextIO.read().from("src/main/resources/customer_1.csv"));
        PCollection<String> pCustList2 = pipeline.apply(TextIO.read().from("src/main/resources/customer_2.csv"));
        PCollection<String> pCustList3 = pipeline.apply(TextIO.read().from("src/main/resources/customer_3.csv"));
        PCollectionList<String> list = PCollectionList.of(pCustList1).and(pCustList2).and(pCustList3);
        PCollection<String> merged =list.apply(Flatten.pCollections());
        merged.apply(TextIO.write().to("src/main/resources/flatten_customer_output.csv").withNumShards(1).withSuffix(".csv"));
        pipeline.run();
    }
}
//here is combined output 
1004,James,Hopes,Los Angeles
1003,Steve,Jobs,Phoenix
1006,Amit,Wilson,Los Angeles
1005,Ajay,Sharma,Phoenix
1001,James,Butt,Los Angeles
1002,Art,Venere,Los Angeles

Partition:-

As we have seen in above example we were combining multiple Pcollection into single Pcollection object but here we are going to implement reverse functionality That means one PCollection Object can be broken into multiple Pcollection Object on the behalf of particular property.

/*
Here is input file which we are going to divide into multiple file on behalf of city
1001,James,Butt,Los Angeles
1002,Art,Venere,Phoenix
1003,Lenna,Foller,New York
1004,Tony,Jobs,Los Angeles
1005,Ajay,Sharma,Phoenix
1006,Amit,Talwar,New York
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;

class CityPartition implements Partition.PartitionFn<String>{

    @Override
    public int partitionFor(String elem, int numPartitions) {
        String arr[] = elem.split(",");
        if (arr[3].equals("Los Angeles")){
            return 0;
        }else if (arr[3].equals("Phoenix")){
            return 1;
        }else {
            return 2;
        }
    }
}
public class PartitionExample {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        PCollection<String> pCollection = pipeline.apply(TextIO.read().from("src/main/resources/Partition.csv"));
        PCollectionList<String> partition = pCollection.apply(Partition.of(3,new CityPartition()));
        PCollection<String> p0 =partition.get(0);
        PCollection<String> p1 =partition.get(1);
        PCollection<String> p2 =partition.get(2);
        p0.apply(TextIO.write().to("src/main/resources/Partition1.csv").withNumShards(1).withSuffix(".csv"));
        p1.apply(TextIO.write().to("src/main/resources/Partition2.csv").withNumShards(1).withSuffix(".csv"));
        p2.apply(TextIO.write().to("src/main/resources/Partition3.csv").withNumShards(1).withSuffix(".csv"));
        pipeline.run();
    }
}
/*
Here all 3 generated output files
output-file-1
1001,James,Butt,Los Angeles
1004,Tony,Jobs,Los Angeles
output-file-2
1002,Art,Venere,Phoenix
1005,Ajay,Sharma,Phoenix
output-file-3
1003,Lenna,Foller,New York
1006,Amit,Talwar,New York
*/

So It’s all about this blog hope you have liked it If you want to explore more about apache beam transformation below I’m providing link feel free to visit this link.

Click here to Explore More

Written by 

I'm a Software Consultant at Knoldus . I have completed my B.tech in Computer Science stream from IMS Engineering College, Ghaziabad. I love to explore new technologies and have great interest in problem solving.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading