How to read from one table and write to another in BigQuery using Apache Beam Dataflow

Reading Time: 5 minutes

Ever thought how to read from a table in GCP BigQuery and perform some aggregation on it and finally writing the output in another table using Beam Pipeline? In this blog we will be going to achieve the same using cloud dataflow as runner.

This blog will perform the task which will gather the month in which tornado occurred and count the the number of occurrence then put the output in new table in BigQuery.

Prerequisite for Beam Pipeline

  1. Maven
  2. IntelliJ
  3. Google cloud SDK
  4. An existing GCP Project
  5. Java 8 JDK and basic java knowledge

Enable BigQuery, Dataflow, and Google Cloud Storage APIs if not already enabled in the API manager. This will take a few minutes.

Copy the Sample Table

  1. Search BigQuery on GCP console and create a dataset under the existing project.
  2. Now visit https://developers.google.com/bigquery/docs/dataset-gsod
  3. Scroll Down, under Sample Tables click on gsod
  4. Copy this table to your newly created dataset. I have highlighted the gsod table below.
Sample tables
In addition to the public datasets, BigQuery provides a limited number of sample tables that you can query. These tables are contained in the bigquery-public-data:samples dataset.

The requirements for querying the BigQuery sample tables are the same as the requirements for querying the public datasets.

The bigquery-public-data:samples dataset includes the following tables:

Name	Description
gsod	Contains weather information collected by NOAA, such as precipitation amounts and wind speeds from late 1929 to early 2010.
github_nested	Contains a timeline of actions such as pull requests and comments on GitHub repositories with a nested schema. Created in September 2012.
github_timeline	Contains a timeline of actions such as pull requests and comments on GitHub repositories with a flat schema. Created in May 2012.
natality	Describes all United States births registered in the 50 States, the District of Columbia, and New York City from 1969 to 2008.
shakespeare	Contains a word index of the works of Shakespeare, giving the number of times each word appears in each corpus.
trigrams	Contains English language trigrams from a sample of works published between 1520 and 2008.
wikipedia	Contains the complete revision history for all Wikipedia articles up to April 2010.

Java Beam Code

  1. clone this repo https://github.com/akipriyadarshi/beam_dataflow.git
  2. cd bigquerytest
  3. ls src/main/java/org/apache/beam/examples . Here you can see BigQueryTornadoes.java which is our beam java code. Other java files are sample file downloaded from Apache Beam Website.
  4. We can also see pom.xml which contains the dependencies for cloud, dataflow etc.
package org.apache.beam.examples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryTornadoes {
  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTornadoes.class);

  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
  private static final String WEATHER_SAMPLES_TABLE =
      "bigquey:foo.tornado";

  /**
   * Examines each row in the input table. If a tornado was recorded in that sample, the month in
   * which it occurred is.
   */
  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      TableRow row = c.element();
      if ((Boolean) row.get("tornado")) {
        c.output(Integer.parseInt((String) row.get("month")));
      }
    }
  }
static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      TableRow row =
          new TableRow()
              .set("month", c.element().getKey())
              .set("tornado_count", c.element().getValue());
      c.output(row);
    }
  }
static class CountTornadoes extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
    @Override
    public PCollection<TableRow> expand(PCollection<TableRow> rows) {

      // row... => month...
      PCollection<Integer> tornadoes = rows.apply(ParDo.of(new ExtractTornadoesFn()));

      // month... => <month,count>...
      PCollection<KV<Integer, Long>> tornadoCounts = tornadoes.apply(Count.perElement());

      // <month,count>... => row...
      PCollection<TableRow> results = tornadoCounts.apply(ParDo.of(new FormatCountsFn()));

      return results;
    }
  }
public interface Options extends PipelineOptions {
    @Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
    @Default.String(WEATHER_SAMPLES_TABLE)
    String getInput();

    void setInput(String value);

    @Description("SQL Query to read from, will be used if Input is not set.")
    @Default.String("")
    String getInputQuery();

    void setInputQuery(String value);

    @Description("Read method to use to read from BigQuery")
    @Default.Enum("EXPORT")
    TypedRead.Method getReadMethod();

    void setReadMethod(TypedRead.Method value);

    @Description("Write method to use to write to BigQuery")
    @Default.Enum("DEFAULT")
    BigQueryIO.Write.Method getWriteMethod();

    void setWriteMethod(BigQueryIO.Write.Method value);

    @Description("Write disposition to use to write to BigQuery")
    @Default.Enum("WRITE_TRUNCATE")
    BigQueryIO.Write.WriteDisposition getWriteDisposition();

    void setWriteDisposition(BigQueryIO.Write.WriteDisposition value);

    @Description("Create disposition to use to write to BigQuery")
    @Default.Enum("CREATE_IF_NEEDED")
    BigQueryIO.Write.CreateDisposition getCreateDisposition();

    void setCreateDisposition(BigQueryIO.Write.CreateDisposition value);

    @Description(
        "BigQuery table to write to, specified as "
            + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
    @Validation.Required
    String getOutput();

    void setOutput(String value);
  }

  public static void applyBigQueryTornadoes(Pipeline p, Options options) {
    // Build the table schema for the output table.
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
    TableSchema schema = new TableSchema().setFields(fields);

    TypedRead<TableRow> bigqueryIO;
    if (!options.getInputQuery().isEmpty()) {
      bigqueryIO =
          BigQueryIO.readTableRows()
              .fromQuery(options.getInputQuery())
              .usingStandardSql()
              .withMethod(options.getReadMethod());
    } else {
      bigqueryIO =
          BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod());

      // Selected fields only applies when using Method.DIRECT_READ and
      // when reading directly from a table.
      if (options.getReadMethod() == TypedRead.Method.DIRECT_READ) {
        bigqueryIO = bigqueryIO.withSelectedFields(Lists.newArrayList("month", "tornado"));
      }
    }

    PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);

    rowsFromBigQuery
        .apply(new CountTornadoes())
        .apply(
            BigQueryIO.writeTableRows()
                .to(options.getOutput())
                .withSchema(schema)
                .withCreateDisposition(options.getCreateDisposition())
                .withWriteDisposition(options.getWriteDisposition())
                .withMethod(options.getWriteMethod()));
  }

  public static void runBigQueryTornadoes(Options options) {
    LOG.info("Running BigQuery Tornadoes with options " + options.toString());
    Pipeline p = Pipeline.create(options);
    applyBigQueryTornadoes(p, options);
    p.run().waitUntilFinish();
  }

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    runBigQueryTornadoes(options);
  }
}

Change the value of variable WEATHER_SAMPLES_TABLE according to your project(projectid:datasetid.tableid)

Configuring the Service Account

  1. Search IAM & Admin on the console and click IAM.
  2. Now for Compute Engine default service account click on pencil symbol on right for adding roles.
  3. Add roles -> Dataflow Admin, BigQuery Admin, Storage Admin
  4. Now in IAM & Admin click Service Accounts, for the Compute Engine default service account click on Actions(three dots) and click on Manage Keys. Download the JSON.
  5. Run the following command on terminal and provide your own JSON file path which you have downloaded, Put your project and service account email
  6. . gcloud auth activate-service-account test-service-account@google.com –key-file=/path/key.json –project=testproject

Executing the Pipeline for Beam

  1. Open the project in IntelliJ
  2. Run the following export command(number 3) on IntelliJ terminal. Put the path of your downloaded json.
  3. export GOOGLE_APPLICATION_CREDENTIALS=”/home/knoldus/Downloads/project1-318909-33dd2dc6f789.json”
  4. Search Cloud Storage on GCP Console. Create two new buckets. create temp folder inside each bucket.
  5. Run the below command(number 6) on IntelliJ terminal. change the values of options according to your project(change project, gcpTempLocation, tempLocation, output-> where you want the new output table to get created)

Run the pipeline using below command:-

 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.BigQueryTornadoes -Dexec.args="--runner=DataflowRunner --project=bigquey --region=us-central1 --gcpTempLocation=gs://apbucket2/tmp --tempLocation=gs://apbucket2/tmp --input=foo.tornado --output=foo.tornado_result" -Pdataflow-runner 

Wait for some time to get it build. It took around 15 minutes for me. check for new table which will contain the number of tornadoes occurred in each month.

You can visit https://blog.knoldus.com/big-data-processing-with-apache-beam/ for more details

Leave a Reply