3

Attempting to write a pipeline in Apache Beam (Python) that will read an input file from a GCP storage bucket, apply transformations then write to BigQuery.

Here is the excerpt for the Apache Beam pipeline:

import logging
import apache_beam as beam
import argparse
import csv
import json
from apache_beam.io.gcp.internal.clients import bigquery

def build_argument_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=True,
        help='Input file to process.')
    parser.add_argument(
        '--project',
        dest='project',
        required=True,
        help='Project ID.'
    )
    parser.add_argument(
        '--datasetId',
        dest='datasetId',
        required=True,
        help='BigQuery dataset ID.'
    )
    parser.add_argument(
        '--tableId',
        dest='tableId',
        required=True,
        help='BigQuery table ID.'
    )
    return parser


def create_pipeline_options(pipeline_arguments):
    pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_arguments)
    pipeline_options.view_as(beam.options.pipeline_options.SetupOptions).save_main_session = True
    return pipeline_options


def run(argv=None):
    parser = build_argument_parser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    table_spec = beam.io.gcp.internal.clients.bigquery.TableReference(
        projectId=known_args.project,
        datasetId=known_args.datasetId,
        tableId=known_args.tableId
    )

    table_schema = {
        'fields': [
            ...
        ]
    }

    with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
        input_data = p | "Read input csv" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)

        input_data_as_json = input_data | "Transforming to json" >> (
            beam.ParDo(
                TransformCsvToJson(build_field_mapping_names())
            )
        ).with_outputs('error', 'success')

        input_data_as_json.success | beam.io.WriteToBigQuery(
            table_spec,
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            additional_bq_parameters={'timePartitioning': {
                'type': 'MONTH'
            }}
        )


if __name__ == '__main__':
    run()

Here is the script to execute the pipeline:

    #!/usr/bin/env bash

python ../pipelines/pipeline.py \
    --input "gs://storage/input_file.csv" \
    --runner DataFlowRunner \
    --project "project_name" \
    --datasetId "dataset_name" \
    --tableId "table_name" \
    --region europe-west2 \
    --staging_location "gs://storage/staging" \
    --temp_location "gs://storage/temp"

Here is the error I get:

    /bin/zsh path/utils/execute_pipeline.sh
Traceback (most recent call last):
  File "../pipelines/pipeline.py", line 138, in <module>
    run()
  File "../pipelines/pipeline.py", line 117, in run
    with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
  File "/environment_path/lib/python3.8/site-packages/apache_beam/pipeline.py", line 202, in __init__
    raise ValueError(
ValueError: Pipeline has validations errors: 
Missing required option: project.
  1. When I use DirectRunner, I don't get this error.
  2. When I use DataflowRunner, the pipeline works when it doesn't have to write to BigQuery.
  3. When I use DataflowRunner and hard code the project, datasetId and tableId it works fine.

UPDATE

Found the culprit, though not sure why this is happening...

Modifying the following works

def run(argv=None):
    parser = build_argument_parser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    #this bit below is new
    pipeline_args.extend([
        '--project=' + known_args.projectId
    ])

When I printed the known_args and pipeline_args, "project" wasn't showing up. Though I'm not sure why it's not picking up the value from the execute script.

3
  • What versio n of Beam are you using? Commented Dec 14, 2020 at 17:17
  • @Pablo Beam version = 2.25 Commented Dec 14, 2020 at 17:35
  • @Pablo edited original post with some additional findings. Commented Dec 14, 2020 at 17:38

1 Answer 1

2

The line known_args, pipeline_args = parser.parse_known_args(argv) splits the arguments into two parts, those arguments that your parser knows about (known_args) and those that it doesn't. Because you also have a flag named project, its value is placed into known_args and never makes it to pipeline_args. You can either makes sure the arguments in your parser are disjoint from those expected to be passed via pipeline_args, or you can augment pipeline_args after parsing as you have done.

Sign up to request clarification or add additional context in comments.

4 Comments

could you explain a bit more by what you mean by "You can either makes sure the arguments in your parser are disjoint from those expected to be passed via pipeline_args" please? I forgot to mention above that if I remove the "parser.add_argument( '--project'" part, project is still not being picked up by pipeline_args or known_args
With parser.add_argument('--project'.... removed, this is what's in known_args and pipeline_args: known_args: Namespace(datasetId='datasetId', input='gs://input/input_file.csv', projectId='projectId', tableId='tableId') pipeline_args: ['--runner', 'DataflowRunner', '--region', 'europe-west2', '--temp_location', 'gs://temp_location/']
Are you still passing --project as a flag?
Yes, I'm still passing project in the shell script.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.