1

I am getting the error belwo while inserting a message from pubsubio to BigQuery.

How can I insert a record from pubsub to BQ. Can we convert pcollection into a list, or is there another alternate?

AttributeError: 'PCollection' object has no attribute 'split'

Here is my code:

def create_record(columns):
    #import re
    col_value=record_ids.split('|')
    col_name=columns.split(",")
    for i in range(length(col_name)):
        schmea_dict[col_name[i]]=col_value[i]
    return schmea_dict

schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER
columns="tungsten_opcode,tungsten_seqno"
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | 
    beam.WindowInto(window.FixedWindows(15))
record_ids = lines | 'Split' >> 
    (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_record(columns))
records | 'BqInsert' >> beam.io.WriteToBigQuery(
    OUTPUT,
    schema=schema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
5
  • def create_record(columns): #import re col_value=record_ids.split('|') col_name=columns.split(",") for i in range(length(col_name)): schmea_dict[col_name[i]]=col_value[i] return schmea_dict Commented Jun 22, 2018 at 14:58
  • Could you please format the code properly. Commented Jun 22, 2018 at 14:59
  • Is there any alternative to load pubsub message to BQ using dataflow Commented Jun 22, 2018 at 15:00
  • Formatted the code Commented Jun 22, 2018 at 18:08
  • Any help is much appreciated. Commented Jun 24, 2018 at 6:21

1 Answer 1

2

Needs to be done as a transform, you cant directly access data in a pcollection.

Write a DoFn class to perform the split transform on the record with schema as a side input, and create the dict with column/records eg.

class CreateRecord(beam.DoFn):
  def process(self, element, schema):
    cols = element.split(',')
    header = map(lambda x: x.split(':')[0], schema.split(','))
    return [dict(zip(header, cols))]

Apply transform like:

schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER'
records = record_ids | 'CreateRecords' >> beam.ParDo(CreateRecord(), SCHEMA)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.