Apache Beam: Side input Pattern

Reading Time: 3 minutes

Apache Beam is a unified programming model for defining both batch and streaming data-parallel processing pipelines. It is a modern way of defining data processing pipelines. It has rich sources of APIs and mechanisms to solve complex use cases. In some use cases, while we define our data pipelines the requirement is, the pipeline should use some additional inputs. For example, In streaming analytics applications, it is common to enrich data with additional information that might be useful for further analysis.

Like, while processing some transnational streaming data it needs to do some database lookup. Then take the decision as per the condition. The result of one computation might need to use in another computation in a pipeline. Need to join additional datasets to the processed one. Broadcast some common values (e.g. a dictionary) to the processing functions. Apache Beam has one common pipeline pattern or mechanism to implement such scenarios i.e, Side input pattern.

What is Side input in Apache Beam

In Apache Beam, Side input allows us to provide additional inputs to ParDo transforms. Means, In addition to the main input Beam PCollection, we can provide additional inputs to a ParDo transform in the form of side inputs. DoFn can access this side input each time it processes an element in the input PCollection. When we specify a side input, we basically create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element. Any object, as well as singleton, tuple or, collections, can be used as a side input.

Apache Beam Side input example

To understand how side input pattern can be use in Apache beam pipeline, let’s implement a simple use case of stock price analysis of google stock data of 2020. The structure of data is given below –

Date,Open,High,Low,Close,Adj Close,Volume
2020-03-19,1088.219971,1152.609985,1055.550049,1111.670044,1111.670044,3703200

Now the task is to get the Months in which the average monthly closing price is greater than the global average adjustment closing price for the year 2020.

Apache Beam Batch pipeline steps to implement above use case

  1. First step is to read the data from source and creating a PCollection.
 PCollection<String> readingGoogleStock = pipeline.apply("ReadingGoogleStock",
 TextIO.read().from("src/main/resources/source/google_stock_20202.csv"))
                .apply("FilteringHeader", Filter
                        .by((String line) -> !line.isEmpty() && !line.equals(CSV_HEADER)));

2. Then, extracting the adjustable closing price and computing its global average for the year 2020.

 PCollectionView<Double> averageAdjustableClosing = readingGoogleStock
                .apply("ExtractAdjustableClosingPrice", FlatMapElements
                        .into(TypeDescriptors.doubles())
                        .via((String line) -> Collections.singletonList(Double.parseDouble(line.split(",")
                .apply("GlobalAverageAdjustableClosing", Combine.globally(new Average()).asSingletonView());

This Global average adjustable price of google stock price data is store as a PCollectionView. This PCollection view is going to be specifying as a side input to the next transformation.

3. Next, Compute the average closing price of google stock price data on a per month basis. Apply series of transformations as below to perform this processing.

PCollection<KV<Integer, Double>> monthClosingPricesKV = readingGoogleStock
   .apply("ExtractMonthPricesKV", MapElements
    .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.doubles()))
     .via((String line) -> {
            String[] fields = line.split(",");
            final DateTime utcDateTime = LocalDateTime.parse(fields[0].trim(),
                  DateTimeFormat.forPattern("yyyy-MM-ss")).toDateTime(DateTimeZone.forID("UTC"));
                   return KV.of(utcDateTime.getMonthOfYear(), Double.parseDouble(fields[5]));
                        }))
                .apply("AverageMonthPrice", Combine.perKey(new Average()));

4. Finally, let’s specifying the side input in the pipeline. The next transform we need to implement is a ParDo – DoFn. This transform will see for which month the average stock price is greater than global adjustable closing price.

  monthClosingPricesKV.apply("SideInput", ParDo.of(new DoFn<KV<Integer, Double>, Void>() {

      @ProcessElement
       public void processElement(ProcessContext processContext) {
        Double globalAverageValue = processContext.sideInput(averageAdjustableClosing);
          if (processContext.element().getValue() >= globalAverageValue) {
                    System.out.println("Month " + processContext.element().getKey() + " has average closing   price" + processContext.element().getValue() + " greater than global adjustable closing price :" + globalAverageValue);
                }
            }
        }).withSideInputs(averageAdjustableClosing));

Pass the global adjustable closing price computed in the second step as side input. Applied a ParDo transformation as above to perform this processing.

Complete code can be found here

Thanks for readingStay connected for more future blogs!!

Leave a Reply