
Introduction
Transform in Apache Beam are the operations in your pipeline, and provide a generic processing framework. You provide processing logic in the form of a function object (colloquially referred to as “user code”), and your user code is applied to each element of an input PCollection
(or more than one PCollection
).
Core Beam transforms
Beam provides the following core transforms, each of which represents a different processing paradigm:
ParDo
GroupByKey
CoGroupByKey
Combine
Flatten
Partition
ParDo
ParDo
is a Beam transform for generic parallel processing. A ParDo
transform considers each element in the input PCollection
, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection
.
ParDo
is useful for a variety of common data processing operations, including:
- Filtering a data set. You can use
ParDo
to consider each element in aPCollection
and either output that element to a new collection or discard it. - Formatting or type-converting each element in a data set. If your input
PCollection
contains elements that are of a different type or format than you want, you can useParDo
to perform a conversion on each element and output the result to a newPCollection
. - Extracting parts of each element in a data set. If you have a
PCollection
of records with multiple fields, for example, you can use aParDo
to parse out just the fields you want to consider into a newPCollection
. - Performing computations on each element in a data set. You can use
ParDo
to perform simple or complex computations on every element, or certain elements, of aPCollection
and output the results as a newPCollection
.
Applying ParDo
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World")).apply(MapElements.via(new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return input.toUpperCase();
}
})).apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
}));
p.run();
}
GroupByKey
GroupByKey
is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey
is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey
to collect all of the values associated with each unique key.
Let’s examine the mechanics of GroupByKey
with a simple example case, where our data set consists of words from a text file and the line number on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where a particular word appears.
// The input PCollection.
PCollection<KV<String, String>> mapped = ...;
// Apply GroupByKey to the PCollection mapped.
// Save the result as the PCollection reduced.
PCollection<KV<String, Iterable<String>>> reduced =
mapped.apply(GroupByKey.<String, String>create());
CoGroupByKey
CoGroupByKey
performs a relational join of two or more key/value PCollection
s that have the same key type.
Consider using CoGroupByKey
if you have multiple data sets that provide information about related things. For example, let’s say you have two different files with user data: one file has names and email addresses; the other file has names and phone numbers. You can join those two data sets, using the user name as a common key and the other data as the associated values. After the join, you have one data set that contains all of the information (email addresses and phone numbers) associated with each name.
In the Beam SDK for Java, CoGroupByKey
accepts a tuple of keyed PCollection
s (PCollection<KV<K, V>>
) as input. For type safety, the SDK requires you to pass each PCollection
as part of a KeyedPCollectionTuple
. You must declare a TupleTag
for each input PCollection
in the KeyedPCollectionTuple
that you want to pass to CoGroupByKey
. As output, CoGroupByKey
returns a PCollection<KV<K, CoGbkResult>>
, which groups values from all the input PCollection
s by their common keys. Each key (all of type K
) will have a different CoGbkResult
, which is a map from TupleTag<T>
to Iterable<T>
. You can access a specific collection in an CoGbkResult
object by using the TupleTag
that you supplied with the initial collection.
final List<KV<String, String>> emailsList =
Arrays.asList(
KV.of("amy", "amy@example.com"),
KV.of("carl", "carl@example.com"),
KV.of("julia", "julia@example.com"),
KV.of("carl", "carl@email.com"));
final List<KV<String, String>> phonesList =
Arrays.asList(
KV.of("amy", "111-222-3333"),
KV.of("james", "222-333-4444"),
KV.of("amy", "333-444-5555"),
KV.of("carl", "444-555-6666"));
PCollection<KV<String, String>> emails = p.apply("CreateEmails", Create.of(emailsList));
PCollection<KV<String, String>> phones = p.apply("CreatePhones", Create.of(phonesList));
After CoGroupByKey
, the resulting data contains all data associated with each unique key from any of the input collections.
final TupleTag<String> emailsTag = new TupleTag<>();
final TupleTag<String> phonesTag = new TupleTag<>();
final List<KV<String, CoGbkResult>> expectedResults =
Arrays.asList(
KV.of(
"amy",
CoGbkResult.of(emailsTag, Arrays.asList("amy@example.com"))
.and(phonesTag, Arrays.asList("111-222-3333", "333-444-5555"))),
KV.of(
"carl",
CoGbkResult.of(emailsTag, Arrays.asList("carl@email.com", "carl@example.com"))
.and(phonesTag, Arrays.asList("444-555-6666"))),
KV.of(
"james",
CoGbkResult.of(emailsTag, Arrays.asList())
.and(phonesTag, Arrays.asList("222-333-4444"))),
KV.of(
"julia",
CoGbkResult.of(emailsTag, Arrays.asList("julia@example.com"))
.and(phonesTag, Arrays.asList())));
The following code example joins the two PCollection
s with CoGroupByKey
, followed by a ParDo
to consume the result. Then, the code uses tags to look up and format data from each collection.
PCollection<KV<String, CoGbkResult>> results =
KeyedPCollectionTuple.of(emailsTag, emails)
.and(phonesTag, phones)
.apply(CoGroupByKey.create());
PCollection<String> contactLines =
results.apply(
ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String name = e.getKey();
Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
String formattedResult =
Snippets.formatCoGbkResults(name, emailsIter, phonesIter);
c.output(formattedResult);
}
}));
Combine
Combine is a Beam transform for combining collections of elements or values in your data. Combine
has variants that work on entire PCollection
s, and some that combine the values for each key in PCollection
s of key/value pairs.
When you apply a Combine
transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.
Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of CombineFn
that has an accumulation type distinct from the input/output type.
The associativity and commutativity of a CombineFn
allows runners to automatically apply some optimizations:
- Combiner lifting: This is the most significant optimization. Input elements are combined per key and window before they are shuffled, so the volume of data shuffled might be reduced by many orders of magnitude. Another term for this optimization is “mapper-side combine.”
- Incremental combining: When you have a
CombineFn
that reduces the data size by a lot, it is useful to combine elements as they emerge from a streaming shuffle. This spreads out the cost of doing combines over the time that your streaming computation might be idle. Incremental combining also reduces the storage of intermediate accumulators.
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public static class Accum {
int sum = 0;
int count = 0;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, Integer input) {
accum.sum += input;
accum.count++;
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
@Override
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
}
Combining a PCollection into a single value
Use the global combine to transform all of the elements in a given PCollection
into a single value, represented in your pipeline as a new PCollection
containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a PCollection
of integers.
// Sum.SumIntegerFn() combines the elements in the input PCollection. The resulting PCollection, called sum,
// contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()));
Flatten
Flatten is a Beam transform for PCollection
objects that store the same data type. Flatten
merges multiple PCollection
objects into a single logical PCollection
.
The following example shows how to apply a Flatten
transform to merge multiple PCollection
objects.
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
Partition
Partition is a Beam transform for PCollection
objects that store the same data type. Partition
splits a single PCollection
into a fixed number of smaller collections.
Partition
divides the elements of a PCollection
according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input PCollection
into each resulting partition PCollection
. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).
The following example divides a PCollection
into percentile groups.
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the
// partitioning function. In this example, we define the PartitionFn in-line. Returns a PCollectionList
// containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
Conclusion
In this blog, we have performed core transforms in Apache Beam with suitable example.
Reference Link:- https://en.wikipedia.org/wiki/Apache_Beam .