1

I want to upsert entities to datastore using apache beam, but before doing WriteToDatastore, I created a custom DoFN that - takes entity from import step, - checks if entity exists in Datastore - if yes, extracts value of a property and concatenates the value with new entity. - outputs entity

Example: My data consists of columns: parent_id, nationality & child_name. Input data are new children born. Before upserting to datastore I want to get a parent's existing chidlren and append new value.

with beam.Pipeline(options=options) as p:

    (p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
     | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict())
     | 'Create entities' >> beam.ParDo(CreateEntities())
     | 'Update entities' >> beam.ParDo(UpdateEntities())
     | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
     )

The Pardo that takes the most time is Update Entities:

class UpdateEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
    query = query_pb2.Query()
    parent_key = entity_pb2.Key()
    parent = datastore_helper.get_value(element.properties['parent_id'])
    datastore_helper.add_key_path(parent_key, kind, parent)
    parent_key.partition_id.namespace_id = datastore_helper.get_value(element.properties['nationality'])
    query.kind.add().name = kind
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.EQUAL, parent_key)


    req = helper.make_request(project=PROJECT, namespace=parent_key.partition_id.namespace_id,query=query)
    resp = helper.get_datastore(PROJECT).run_query(req)

    if len(resp.batch.entity_results) > 0:
        existing_entity = resp.batch.entity_results[0].entity
        existing_child_name_v = datastore_helper.get_value(existing_entity.properties['child_name'])
        new_child_names = existing_child_name_v + ';' + datastore_helper.get_value(element.properties['child_name'])
        datastore_helper.set_value(element.properties['child_name'],new_child_names)
        return [element]
    else:
        return [element]

1 Answer 1

1

It's not surprising that the UpdateEntities is the slowest part of your beam flow. You do an RPC in each and every call to UpdateEntities (you should use get/lookup instead of query on a key because queries on keys are eventually consistent). As long as you do an RPC in UpdateEntities it will be the slowest part of your job.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you. Are you saying i better replace the above query with ReadFromDatastore transform as a side input to the pipeline and then do the lookup? or sth else?
I didn't actually have a suggested fix ;) That said, yes you'll want to find a way to do all of your loads in parallel to the other work then merge your look up results with your other steps. The important thing I said, is that if you have an RPC in your ParDo fn then it will be slow compared to ParDo functions without any RPCs.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.