Trying to implement an easier form of this example I have and error while insert data to BigQuery
This is the code
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class DataIngestion:
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(zip('Mensaje',values))
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default='C:\XXXX\prueba.csv')
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='PruebasIoT.TablaIoT')
known_args, pipeline_args = parser.parse_known_args(argv)
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink
(
known_args.output,
schema='Mensaje:STRING'
)
)
)
p.run().wait_until_finish()
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
run()
And this is the error:
RuntimeError: Could not successfully insert rows to BigQuery table [XXX]. Errors: [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u'm'
message: u'no such field.'
reason: u'invalid'>]
index: 0>, <InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u'm'
message: u'no such field.'
reason: u'invalid'>]
index: 1>]
I'm new with python and maybe the solutions is quite simple, but how I could do it?
It would be possible to pass a single string in String To BigQuery Row instead of
'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
This would be the easier way to start better than using csv files and have to translate the file