Stateful processing with Apache Beam

Reading Time: 6 minutes

Overview

  • Beam lets us process unbounded, out-of-order, global-scale data with portable high-level pipelines.
  • Stateful processing is a new feature of the Beam model that expands the capabilities of Beam.
  • With these new features, we can unlock newer use cases and newer efficiencies

Quick Recap

  • In Beam, a big data processing pipeline is a directed, acyclic graph of parallel operations called PTransforms processing data from PCollections.

  • The boxes are PTransforms and the edges represent the data in PCollections flowing from one PTransform to the next.
  • PCollection can be bounded or unbounded.
  • Moreover, the cylinders are the data sources and sinks at the edges of your pipeline.
  • Such as bounded collections of log files or unbounded data streaming over a Kafka topic
  • For more information, pls refer to the blog. In this blog, we will be discussing about stateful processing.

Working of stateful processing in Beam?

  • DoFn expresses the processing logic of our ParDo transformation which applies to each element.
  • Without stateful augmentations, a DoFn is a mostly-pure function from inputs to one or more outputs, corresponding to the Mapper in a MapReduce.
  • With state, a DoFn has the ability to access persistent mutable state while processing each input element.
  • Let us consider this example :-

  • The first thing to note is that all the data – the little squares, circles, and triangles – are red.
  • This states that stateful processing occurs in the context of a single key – all of the elements are key-value pairs with the same key.
  • Calls from your chosen Beam runner to the DoFn are colored in yellow, while calls from the DoFn to the runner are in purple:
  • The runner invokes the DoFn‘s @ProcessElement method on each element for a key+window.
  • In addition to it, the DoFn reads and writes state – the curved arrows to/from the storage on the side.
  • The DoFn emits output (or side output) to the runner as usual via ProcessContext.output (resp. ProcessContext.sideOutput)

State in Beam’s Java SDK

  • Here is the code for a stateful DoFn that assigns an arbitrary but consistent index to each element on a per key-and-window basis.
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {

  // A state cell holding a single Integer per key+window
  @StateId("index")
  private final StateSpec<ValueState<Integer>> indexSpec =
      StateSpecs.value(VarIntCoder.of());

  @ProcessElement
  public void processElement(
      ProcessContext context,
      @StateId("index") ValueState<Integer> index) {
    int current = firstNonNull(index.read(), 0);
    context.output(KV.of(current, context.element()));
    index.write(current+1);
  }
}
  • The first thing to look at is the presence of a couple of @StateId("index") annotations. This means we are using a mutable state cell named “index” in this DoFn.
  • In addition to it, the Beam Java SDK, and from there our chosen runner, will also note these annotations and use them to wire up your DoFn correctly.
  • The first @StateId("index") is annotated on a field of type StateSpec (for “state specification”). This declares and configures the state cell.
  • The type parameter ValueState describes the kind of state you can get out of this cell – ValueState stores just a single value. Secondly, the spec itself is not a usable state cell – we need the runner to provide that during pipeline execution.
  • To fully specify a ValueState cell, we need to provide the coder that the runner will use (as necessary) to serialize the value you will be storing. This is the invocation StateSpecs.value(VarIntCoder.of()).
  • The second @StateId("index") annotation is on a parameter to your @ProcessElement method. Above all, this indicates access to the ValueState cell.
  • The state accesses in the simplest way: read() to read it, and write(newvalue) to write it.
  • The other features of DoFn are available in the usual way – such as context.output(...).

Example: anomaly detection

  • Suppose we are feeding a stream of actions by our user into some complex model to predict some quantitative expression of the sorts of actions they take, for example to detect fraudulent activity.
  • We will build up the model from events, and also compare incoming events against the latest model to determine if something has changed.
  • If we try to express the building of your model as a CombineFn, e may have trouble with mergeAccumulators. Assuming you could express that, it might look something like this:
class ModelFromEventsFn extends CombineFn<Event, Model, Model> {
    @Override
    public abstract Model createAccumulator() {
      return Model.empty();
    }

    @Override
    public abstract Model addInput(Model accumulator, Event input) {
      return accumulator.update(input); // this is encouraged to mutate, for efficiency
    }

    @Override
    public abstract Model mergeAccumulators(Iterable<Model> accumulators) {
      // ?? can you write this ??
    }

    @Override
    public abstract Model extractOutput(Model accumulator) {
      return accumulator; }
}
  • Now we have a way to compute the model of a particular user for a window as Combine.perKey(new ModelFromEventsFn()).
  • A standard way to do take the result of a Combine transform and use it while processing the elements of a PCollection is to read it as a side input to a ParDo transform.
  • So we could side input the model and check the stream of events against it, outputting the prediction, like so:
PCollection<KV<UserId, Event>> events = ...

final PCollectionView<Map<UserId, Model>> userModels = events
    .apply(Combine.perKey(new ModelFromEventsFn()))
    .apply(View.asMap());

PCollection<KV<UserId, Prediction>> predictions = events
    .apply(ParDo.of(new DoFn<KV<UserId, Event>>() {

      @ProcessElement
      public void processElement(ProcessContext ctx) {
        UserId userId = ctx.element().getKey();
        Event event = ctx.element().getValue();

        Model model = ctx.sideinput(userModels).get(userId);

        // Perhaps some logic around when to output a new prediction
        … c.output(KV.of(userId, model.prediction(event))) …
      }
    }));
  • In this pipeline, there is just one model emitted by the Combine.perKey(...) per user, per window, which is then prepared for side input by the View.asMap() transform.
  • The processing of the ParDo over events will block until that side input is ready, buffering events, and will then check each event against the model.
  • This is a high latency, high completeness solution: The model takes into account all user behavior in the window, but there can be no output until the window is complete.
  • Suppose we want to get some results earlier, or don’t even have any natural windowing, but just want continuous analysis with the “model so far”, even though our model may not be as complete.
  • Triggers are the generic Beam feature for managing completeness versus latency tradeoffs. So here is the same pipeline with an added trigger that outputs a new model one second after input arrives.
PCollection<KV<UserId, Event>> events = ...

PCollectionView<Map<UserId, Model>> userModels = events

    // A tradeoff between latency and cost
    .apply(Window.triggering(
        AfterProcessingTime.pastFirstElementInPane(Duration.standardSeconds(1)))

    .apply(Combine.perKey(new ModelFromEventsFn()))
    .apply(View.asMap());

Stateful Processing Code

  • Stateful processing lets us address both the latency problem of side inputs and the cost problem of excessive uninteresting output. Here is the code, using only features:-
new DoFn<KV<UserId, Event>, KV<UserId, Prediction>>() {

  @StateId("model")
  private final StateSpec<ValueState<Model>> modelSpec =
      StateSpecs.value(Model.coder());

  @StateId("previousPrediction")
  private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
      StateSpecs.value(Prediction.coder());

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("previousPrediction") ValueState<Prediction> previousPredictionState,
      @StateId("model") ValueState<Model> modelState) {
    UserId userId = c.element().getKey();
    Event event = c.element().getValue()

    Model model = modelState.read();
    Prediction previousPrediction = previousPredictionState.read();
    Prediction newPrediction = model.prediction(event);
    model.add(event);
    modelState.write(model);
    if (previousPrediction == null
        || shouldOutputNewPrediction(previousPrediction, newPrediction)) {
      c.output(KV.of(userId, newPrediction));
      previousPredictionState.write(newPrediction);
    }
  }
};
  • We have two state cells declared, @StateId("model") to hold the current state of the model for a user and @StateId("previousPrediction") to hold the prediction output previously.
  • Access to the two state cells by annotation in the @ProcessElement method is as before.
  • We read the current model via modelState.read(). per key-and-window.
  • This is a model just for the UserId of the Event currently being processed.
  • We derive a new prediction model.prediction(event) and compare it against the last one output, accessed via previousPredicationState.read().
  • We can then update the model model.update() and write it via modelState.write(...).
  • It is perfectly fine to mutate the value we pulled out of state as long as we also remember to write the mutated value, in the same way we are encouraged to mutate CombineFn accumulators.
  • If the prediction has changed a significant amount since the last time we output, we emit it via context.output(...).
  • And save the prediction using previousPredictionState.write(...).

Performance considerations

  • Partitioning per-key-and-window:
    • The runner may have to shuffle your data to colocate all the data for a particular key+window.
    • If the data shuffling completes correctly, the runner may take advantage of this
  • Storage and fault tolerance of state:
    • Since state is per-key-and-window, the more keys and windows we expect to process the more storage will incur.
    • Because state benefits from all the fault tolerance / consistency properties of your other data in Beam.
    • It also adds to the cost of committing the results of processing.
  • Expiration of state:
    • Also since state is per-window, the runner can reclaim the resources when a window expires.
    • But this could mean that the runner is tracking an additional timer per key.
    • And window to cause reclamation code to execute.
  • Synchronization overhead:
    • The API is designed so the runner takes care of concurrency control.
    • But this means that the runner cannot parallelize processing of elements for a particular key + window.

Conclusion

In this blog, we got to know about how to implement stateful processing in Apache Beam. Its advantages and performance considerations.


Knoldus-blog-footer-image

Written by 

Am a technology enthusiast having 3+ years of experience. I have worked on Core Java, Apache Flink, Apache Beam, AWS, GCP, Kafka, Spark, MySQL. I am curious about learning new technologies.