Apache Beam: Ways to join PCollections

Reading Time: 4 minutes

Joining multiple sets of data into a singular entity is very often when working with data pipelines. In this blog, We will cover how we can perform Join operations between datasets in Apache Beam. There are different ways to Join PCollections in Apache beam –

  1. Extension-based joins
  2. Group-by-key-based joins
  3. Join using side input

Let’s understand the above different way’s to perform Join with examples. We have two data sets/CSV files of mall customers’ income data and corresponding mall customers’ spending scores. The structure of the datasets –

Mall customers income dataset

CustomerID,Gender,Age,Annual Income (k$)

0001,Male,19,15

Mall customers spending score dataset –

CustomerID,Spending Score (1-100)

0001,39

  • Extension-based joins
  • Beam supplies a Join library which is useful to perform Join operations. But the data still needs to be prepared before the join. Now, the first task is to read both the datasets in PCollection key-value object.

    Reading mall customers income dataset and creating a PCollection key-value object. CustomerID as a key and
    Corresponding customers Gender as value.

     LOGGER.info("Reading Mall customers income data");
    
            PCollection<KV<String, String>> customerIdGenderKV =
    
     pipeline.apply("ReadingMallCustomersIncome", TextIO.read()
    
              .from("src/main/resources/source/Mall_Customers_Income.csv"))
    
              .apply("FilterHeader", Filter.by(line -> !line.isEmpty() &&
    
                         !line.contains("CustomerID,Gender,Age,Annual Income (k$)")))
    
              .apply("IdGenderKV", MapElements
    
                      .into(TypeDescriptors.kvs(TypeDescriptors.strings(),
    
     TypeDescriptors.strings()))
    
                      .via((String line) -> {
    
                            String[] tokens = line.split(",");
    
                            return KV.of(tokens[0], tokens[1]);
    
                         }));

    Reading mall customers spending score dataset and creating a PCollection key-value object. CustomerID as a key and
    Corresponding customers spending score as value.

    LOGGER.info("Reading Mall customers spending scores data");
    
    PCollection<KV<String, Integer>> customerIdScoreKV =
    
     pipeline.apply("ReadingMallCustomersScores", TextIO.read()
    
            .from("src/main/resources/source/Mall_Customers_Scoring.csv"))
    
            .apply("FilterHeader", Filter.by(line -> !line.isEmpty() &&
    
                    !line.contains("CustomerID,Spending Score (1-100)")))
    
            .apply("IdGenderKV", MapElements
    
                    .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
    
                    .via((String line) -> {
    
                        String[] tokens = line.split(",");
    
                        return KV.of(tokens[0], Integer.parseInt(tokens[1]));
    
                    }));

    Now, Joining above two PCollections.

    a) Inner Join

    Performing Inner Join on CutomerId column. Every record from the left PCollection(customerIdGenderKV), Join with the corresponding right PCollection(customerIdScoreKV). Only matching records in both PCollections will be present in the Join Result.

    LOGGER.info("Inner Join of customerIdGenderKV  and customerIdScoreKV PCollection");
    
    Join.innerJoin(customerIdGenderKV, customerIdScoreKV)
    
            .apply("JoinedData", MapElements
    
                    .into(TypeDescriptors.voids())
    
            .via((KV<String, KV<String, Integer>> joinedStream) -> {
    
                LOGGER.info("customerId :{} , Gender: {} , spending score: {}",
    
                        joinedStream.getKey(), 
    
    joinedStream.getValue().getKey(), joinedStream.getValue().getValue());
    
                return null;
    
            }));

    b) Left Outer Join

    In this Join, all of the records from the left PCollection will be present in the final result. Those records on left PCollection which don’t have the match in the right PCollection, specifying as null value object. Customers which don’t find a match in spending score PCollection, assign it -1.

    LOGGER.info("Left outer Join of customerIdGenderKV 
    
     and customerIdScoreKV PCollection");
    
    Join.leftOuterJoin(customerIdGenderKV, customerIdScoreKV, -1)
    
            .apply("JoinedData", MapElements
    
                    .into(TypeDescriptors.voids())
    
                    .via((KV<String, KV<String, Integer>> joinedStream) -> {
    
                        LOGGER.info("customerId :{} , Gender: {} , spending score: {}",
    
                                joinedStream.getKey(), joinedStream.getValue().getKey(),
    
     joinedStream.getValue().getValue());
    
                        return null;
    
                    }));

    c) Right Outer Join

    Exactly same as Left outer Join. The difference is, In the Joined result, every record will be found from right PCollection those records on right PCollection which don’t have the match in the left PCollection, specifying as a null value object. In left PCollection, some element doesn’t have Gender so, specifying it as unavailable.

    LOGGER.info("Right outer Join of customerIdGenderKV  
    
    and customerIdScoreKV PCollection");
    
    Join.rightOuterJoin(customerIdGenderKV, customerIdScoreKV, "unavailable")
    
            .apply("JoinedData", MapElements
    
                    .into(TypeDescriptors.voids())
    
                    .via((KV<String, KV<String, Integer>> joinedStream) -> {
    
                        LOGGER.info("customerId :{} , Gender: {} , spending score: {}",
    
                                joinedStream.getKey(), joinedStream.getValue().getKey(),
    
     joinedStream.getValue().getValue());
    
                        return null;
                    }));

    d) Full outer Join

    In this operation, all of the records from both the right and left PCollections will be present in the result. Any missing fields will be filled with specifying respective null values in left and right PCollections.

    Join.fullOuterJoin(customerIdGenderKV, customerIdScoreKV, "others", -1)
    
            .apply("JoinedData", MapElements
    
                    .into(TypeDescriptors.voids())
    
                    .via((KV<String, KV<String, Integer>> joinedStream) -> {
    
                        LOGGER.info("customerId :{} , Gender: {} , spending score: {}",
    
                                joinedStream.getKey(), 
    
    joinedStream.getValue().getKey(), joinedStream.getValue().getValue());
    
                        return null;
                    }));

  • Group-by-key-based joins
  • Beam facilitates to perform Join operations using CoGroupByKey transformation. There are four steps to perform Join with CoGroupByKey transformation –

    a). Define PCollections to join

    Let’s use the same above Pcollection’s i.e, mall customers’ income data and spending score data. Spending score PCollection will be same but little change in Customers income PCollection. Value for key – CutomerId will be customers Annual Income (k$).

    PCollection<KV<Integer, Integer>> customerIncome = 
    
    pipeline.apply("ReadMallCustomerIncome"
    
    ......................
    
    ......................
    
    b). Define the TupleTag corresponding to the created PCollections
    TupleTag<Integer> incomeTag = new TupleTag<>();
    
    TupleTag<Integer> scoreTag = new TupleTag<>();
    c) Merge the PCollections with org.apache.beam.sdk.transforms.join.CoGroupByKey transform
    PCollection<KV<Integer, CoGbkResult>> joinedResult = KeyedPCollectionTuple
    
            .of(incomeTag, customerIncome)
    
            .and(scoreTag, customerScoring)
    
            .apply(CoGroupByKey.create());
    d) Process received org.apache.beam.sdk.transforms.join.CoGbkResult with appropriated transform
    joinedResult.apply("JoinedResult", MapElements
    
            .into(TypeDescriptors.voids())
    
            .via((KV<Integer, CoGbkResult> joinedStream) -> {
    
                LOGGER.info("customerId :{} , Customer income: {} , spending score: {}",
    
                        joinedStream.getKey(), joinedStream.getValue()
    
    
    .getOnly(incomeTag, 0),
    
                        joinedStream.getValue().getOnly(scoreTag, 0));
    
                return null;
    
            }));

  • Join using side input
  • We can perform Join using the Beam side input pattern. For that one of the PCollection needs to be converted into PCollectionView. So that it can be specified as side input. Let’s convert the Customer Score KV object to PCollectionView of map of key-value(CustomerId-spending score).

    final PCollectionView<Map<String, Integer>> customerScoreView =
    
             customerIdScoreKV.apply(View.asMap());

    Then, pass this view as side input to Beam ParDo transforms.

    customerIdGenderKV.apply(ParDo.of(new DoFn<KV<String, String>, String>() {
    
        @ProcessElement
    
        public void processElement(ProcessContext processContext) {
    
            Map<String, Integer> customerScores = 
    
    processContext.sideInput(customerScoreView);
    
            KV<String, String> element = processContext.element();
    
            Integer score = customerScores.get(element.getKey());
    
            LOGGER.info("customerId :{} , Gender: {} , spending score: {}",
    
                    element.getKey(), element.getValue(), score);
        }
    }).withSideInputs(customerScoreView));

    Complete code can be found here.

    Thanks for readingStay connected for more future blogs!!

    Written by 

    Exploring Big Data Technologies.