0

Trying to implement an easier form of this example I have and error while insert data to BigQuery

This is the code

from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class DataIngestion:
    def parse_method(self, string_input):
        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(zip('Mensaje',values))
        return row



def run(argv=None):
    """The main function which creates the pipeline and runs it."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input', dest='input', required=False,
        help='Input file to read.  This can be a local file or '
             'a file in a Google Storage Bucket.',
        default='C:\XXXX\prueba.csv')

    parser.add_argument('--output', dest='output', required=False,
                        help='Output BQ table to write results to.',
                        default='PruebasIoT.TablaIoT')

    known_args, pipeline_args = parser.parse_known_args(argv)

    data_ingestion = DataIngestion()

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (p
     | 'Read from a File' >> beam.io.ReadFromText(known_args.input,
                                                  skip_header_lines=1)

     | 'String To BigQuery Row' >> beam.Map(lambda s:
                                            data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
                beam.io.BigQuerySink
                    (
                    known_args.output,
                    schema='Mensaje:STRING'
                 )
            )
     )
    p.run().wait_until_finish()


if __name__ == '__main__':
    #  logging.getLogger().setLevel(logging.INFO)
    run()

And this is the error:

RuntimeError: Could not successfully insert rows to BigQuery table [XXX]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u'm'
 message: u'no such field.'
 reason: u'invalid'>]
 index: 0>, <InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u'm'
 message: u'no such field.'
 reason: u'invalid'>]
 index: 1>]

I'm new with python and maybe the solutions is quite simple, but how I could do it?

It would be possible to pass a single string in String To BigQuery Row instead of

'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))

This would be the easier way to start better than using csv files and have to translate the file

2 Answers 2

1

I understand you have an input CSV file with a single column, of the form:

Message
This is a message
This is another message
I am writing to BQ

If my understanding was correct, you do not need to have the parse_method() method, because as explained in the sample you shared, this is just a helper method that maps the CSV values to dictionaries (which are accepted by beam.io.BigQuerySink).

Then, you can simply do something like:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
 | 'String To BigQuery Row' >> beam.Map(lambda s: dict(Message = s))
 | 'Write to BigQuery' >> beam.io.Write(
    beam.io.BigQuerySink(known_args.output, schema='Message:STRING')))

p.run().wait_until_finish()

Note that the only relevant difference is that the "String to BigQuery Row" mapping does not need a complex method anymore, and all it does is create a Python dictionary like {Message: "This is a message"}, where Message is the name of the column in your BQ table. In this mapping, s is each of the String elements read in the beam.io.ReadFromText transform, and we apply a lambda function.

Sign up to request clarification or add additional context in comments.

Comments

0

To solve using a CSV file with only one value per row I have to use this:

    values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
    row = dict(zip(('Name',),values))

I dont know why I have to put the "," after the 'Name' but if I don't do it, the dict(zip(... doesnt work properly

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.