Reading Time: 6 minutes
Example: anomaly detection
Overview
- Beam lets us process unbounded, out-of-order, global-scale data with portable high-level pipelines.
- Stateful processing is a new feature of the Beam model that expands the capabilities of Beam.
- With these new features, we can unlock newer use cases and newer efficiencies
Quick Recap
- In Beam, a big data processing pipeline is a directed, acyclic graph of parallel operations called
PTransforms
processing data fromPCollections
.
- The boxes are
PTransforms
and the edges represent the data inPCollections
flowing from onePTransform
to the next. - A
PCollection
can be bounded or unbounded. - Moreover, the cylinders are the data sources and sinks at the edges of your pipeline.
- Such as bounded collections of log files or unbounded data streaming over a Kafka topic
- For more information, pls refer to the blog. In this blog, we will be discussing about stateful processing.
Working of stateful processing in Beam?
DoFn
expresses the processing logic of our
transformation which applies to each element.ParDo
- Without stateful augmentations, a
DoFn
is a mostly-pure function from inputs to one or more outputs, corresponding to the Mapper in a MapReduce. - With state, a
DoFn
has the ability to access persistent mutable state while processing each input element. - Let us consider this example :-
- The first thing to note is that all the data – the little squares, circles, and triangles – are red.
- This states that stateful processing occurs in the context of a single key – all of the elements are key-value pairs with the same key.
- Calls from your chosen Beam runner to the
DoFn
are colored in yellow, while calls from theDoFn
to the runner are in purple: - The runner invokes the
DoFn
‘s@ProcessElement
method on each element for a key+window. - In addition to it, the
DoFn
reads and writes state – the curved arrows to/from the storage on the side. - The
DoFn
emits output (or side output) to the runner as usual viaProcessContext.output
(resp.ProcessContext.sideOutput
)
State in Beam’s Java SDK
- Here is the code for a stateful
DoFn
that assigns an arbitrary but consistent index to each element on a per key-and-window basis.
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {
// A state cell holding a single Integer per key+window
@StateId("index")
private final StateSpec<ValueState<Integer>> indexSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("index") ValueState<Integer> index) {
int current = firstNonNull(index.read(), 0);
context.output(KV.of(current, context.element()));
index.write(current+1);
}
}
- The first thing to look at is the presence of a couple of
@StateId("index")
annotations. This means we are using a mutable state cell named “index” in thisDoFn
. - In addition to it, the Beam Java SDK, and from there our chosen runner, will also note these annotations and use them to wire up your DoFn correctly.
- The first
@StateId("index")
is annotated on a field of typeStateSpec
(for “state specification”). This declares and configures the state cell. - The type parameter
ValueState
describes the kind of state you can get out of this cell –ValueState
stores just a single value. Secondly, the spec itself is not a usable state cell – we need the runner to provide that during pipeline execution. - To fully specify a
ValueState
cell, we need to provide the coder that the runner will use (as necessary) to serialize the value you will be storing. This is the invocationStateSpecs.value(VarIntCoder.of())
. - The second
@StateId("index")
annotation is on a parameter to your@ProcessElement
method. Above all, this indicates access to the ValueState cell. - The state accesses in the simplest way:
read()
to read it, andwrite(newvalue)
to write it. - The other features of
DoFn
are available in the usual way – such ascontext.output(...)
.
Example: anomaly detection
- Suppose we are feeding a stream of actions by our user into some complex model to predict some quantitative expression of the sorts of actions they take, for example to detect fraudulent activity.
- We will build up the model from events, and also compare incoming events against the latest model to determine if something has changed.
- If we try to express the building of your model as a
CombineFn
, e may have trouble withmergeAccumulators
. Assuming you could express that, it might look something like this:
class ModelFromEventsFn extends CombineFn<Event, Model, Model> {
@Override
public abstract Model createAccumulator() {
return Model.empty();
}
@Override
public abstract Model addInput(Model accumulator, Event input) {
return accumulator.update(input); // this is encouraged to mutate, for efficiency
}
@Override
public abstract Model mergeAccumulators(Iterable<Model> accumulators) {
// ?? can you write this ??
}
@Override
public abstract Model extractOutput(Model accumulator) {
return accumulator; }
}
- Now we have a way to compute the model of a particular user for a window as
Combine.perKey(new ModelFromEventsFn())
. - A standard way to do take the result of a
Combine
transform and use it while processing the elements of aPCollection
is to read it as a side input to aParDo
transform. - So we could side input the model and check the stream of events against it, outputting the prediction, like so:
PCollection<KV<UserId, Event>> events = ...
final PCollectionView<Map<UserId, Model>> userModels = events
.apply(Combine.perKey(new ModelFromEventsFn()))
.apply(View.asMap());
PCollection<KV<UserId, Prediction>> predictions = events
.apply(ParDo.of(new DoFn<KV<UserId, Event>>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
UserId userId = ctx.element().getKey();
Event event = ctx.element().getValue();
Model model = ctx.sideinput(userModels).get(userId);
// Perhaps some logic around when to output a new prediction
… c.output(KV.of(userId, model.prediction(event))) …
}
}));
- In this pipeline, there is just one model emitted by the
Combine.perKey(...)
per user, per window, which is then prepared for side input by theView.asMap()
transform. - The processing of the
ParDo
over events will block until that side input is ready, buffering events, and will then check each event against the model. - This is a high latency, high completeness solution: The model takes into account all user behavior in the window, but there can be no output until the window is complete.
- Suppose we want to get some results earlier, or don’t even have any natural windowing, but just want continuous analysis with the “model so far”, even though our model may not be as complete.
- Triggers are the generic Beam feature for managing completeness versus latency tradeoffs. So here is the same pipeline with an added trigger that outputs a new model one second after input arrives.
PCollection<KV<UserId, Event>> events = ...
PCollectionView<Map<UserId, Model>> userModels = events
// A tradeoff between latency and cost
.apply(Window.triggering(
AfterProcessingTime.pastFirstElementInPane(Duration.standardSeconds(1)))
.apply(Combine.perKey(new ModelFromEventsFn()))
.apply(View.asMap());
Stateful Processing Code
- Stateful processing lets us address both the latency problem of side inputs and the cost problem of excessive uninteresting output. Here is the code, using only features:-
new DoFn<KV<UserId, Event>, KV<UserId, Prediction>>() {
@StateId("model")
private final StateSpec<ValueState<Model>> modelSpec =
StateSpecs.value(Model.coder());
@StateId("previousPrediction")
private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
StateSpecs.value(Prediction.coder());
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("previousPrediction") ValueState<Prediction> previousPredictionState,
@StateId("model") ValueState<Model> modelState) {
UserId userId = c.element().getKey();
Event event = c.element().getValue()
Model model = modelState.read();
Prediction previousPrediction = previousPredictionState.read();
Prediction newPrediction = model.prediction(event);
model.add(event);
modelState.write(model);
if (previousPrediction == null
|| shouldOutputNewPrediction(previousPrediction, newPrediction)) {
c.output(KV.of(userId, newPrediction));
previousPredictionState.write(newPrediction);
}
}
};
- We have two state cells declared,
@StateId("model")
to hold the current state of the model for a user and@StateId("previousPrediction")
to hold the prediction output previously. - Access to the two state cells by annotation in the
@ProcessElement
method is as before. - We read the current model via
modelState.read()
. per key-and-window. - This is a model just for the UserId of the Event currently being processed.
- We derive a new prediction
model.prediction(event)
and compare it against the last one output, accessed viapreviousPredicationState.read()
. - We can then update the model
model.update()
and write it viamodelState.write(...)
. - It is perfectly fine to mutate the value we pulled out of state as long as we also remember to write the mutated value, in the same way we are encouraged to mutate
CombineFn
accumulators. - If the prediction has changed a significant amount since the last time we output, we emit it via
context.output(...)
. - And save the prediction using
previousPredictionState.write(...)
.
Performance considerations
- Partitioning per-key-and-window:
- The runner may have to shuffle your data to colocate all the data for a particular key+window.
- If the data shuffling completes correctly, the runner may take advantage of this
- Storage and fault tolerance of state:
- Since state is per-key-and-window, the more keys and windows we expect to process the more storage will incur.
- Because state benefits from all the fault tolerance / consistency properties of your other data in Beam.
- It also adds to the cost of committing the results of processing.
- Expiration of state:
- Also since state is per-window, the runner can reclaim the resources when a window expires.
- But this could mean that the runner is tracking an additional timer per key.
- And window to cause reclamation code to execute.
- Synchronization overhead:
- The API is designed so the runner takes care of concurrency control.
- But this means that the runner cannot parallelize processing of elements for a particular key + window.
Conclusion
In this blog, we got to know about how to implement stateful processing in Apache Beam. Its advantages and performance considerations.
