3

I'm writing a simple Beam job to copy data from a GCS bucket over to BigQuery. The code looks like the following:

from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam as beam

pipeline_options = GoogleCloudOptions(flags=sys.argv[1:])
pipeline_options.project = PROJECT_ID
pipeline_options.region = 'us-west1'
pipeline_options.job_name = JOB_NAME
pipeline_options.staging_location = BUCKET + '/binaries'
pipeline_options.temp_location = BUCKET + '/temp'

schema = 'id:INTEGER,region:STRING,population:INTEGER,sex:STRING,age:INTEGER,education:STRING,income:FLOAT,statusquo:FLOAT,vote:STRING'
p = (beam.Pipeline(options = pipeline_options)
     | 'ReadFromGCS' >> beam.io.textio.ReadFromText('Chile.csv')
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('project:tmp.dummy', schema = schema))

Where we're writing into the table tmp.dummy in the project project. This is resulting in the following stacktrace:

Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 151, in _run_module_as_main
    mod_name, loader, code, fname = _get_module_details(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 101, in _get_module_details
    loader = get_loader(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 464, in get_loader
    return find_loader(fullname)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 474, in find_loader
    for importer in iter_importers(fullname):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 430, in iter_importers
    __import__(pkg)
  File "WriteToBigQuery.py", line 49, in <module>
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(str(PROJECT_ID + ':' + pipeline_options.write_file), schema = schema))
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1337, in __init__
    self.table_reference = _parse_table_reference(table, dataset, project)
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 309, in _parse_table_reference
    if isinstance(table, bigquery.TableReference):
AttributeError: 'module' object has no attribute 'TableReference'

It looks like some import is going wrong somewhere; is it possible this has resulted from using the GoogleCloudOptions pipeline option?

2 Answers 2

6

I have got the same error. I realized that I have installed the wrong apache beam package. You need to add [gcp] to the package-name while installing apache beam.

sudo pip install apache_beam[gcp]

Some more optional installation to fix the installation errors and you are good to go.

sudo pip install oauth2client==3.0.0
sudo pip install httplib2==0.9.2
Sign up to request clarification or add additional context in comments.

2 Comments

I was trying to install apache_beam[gcp] in Mac and was not returning results. But this solved the issue in a good Linux distribution
Just put quotes around apache_beam[gcp]. Its a zsh problem
4

I made some tests and was unable to reproduce your issue, does the dataset already exist?. The following snippet worked for me (I use an answer for better formatting):

import apache_beam as beam
import sys

PROJECT='PROJECT_ID'
BUCKET='BUCKET_NAME'
schema = 'id:INTEGER,region:STRING'

class Split(beam.DoFn):

    def process(self, element):
        id, region = element.split(",")

        return [{
            'id': int(id),
            'region': region,
        }]

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/staging/dummy.csv'.format(BUCKET))
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:test.dummy'.format(PROJECT), schema=schema)
   )

   p.run()

if __name__ == '__main__':
   run()

where dummy.csv contains:

$ cat dummy.csv 
1,us-central1 
2,europe-west1 

and output in BigQuery is:

enter image description here

Some relevant dependencies used:

apache-beam==2.4.0
google-cloud-bigquery==0.25.0
google-cloud-dataflow==2.4.0

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.