0

I am writing a Dataflow job which reads from BigQuery and does a few transformations.

data = (
    pipeline
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
    | beam.Map(print)
)

But my requirement is to read from BigQuery only after receiving a notification from a PubSub Topic. The above DataFlow job should start reading data from BigQuery only if the below message is received. If it is a different job id or a different status, then no action should be done.

PubSub Message : {'job_id':101, 'status': 'Success'}

Any help on this part?

1
  • Alternatively, you can use Beam's Wait transform (example here: rmannibucau.metawerx.net/post/…) (I am not the author of the post.) Commented Dec 21, 2022 at 17:46

2 Answers 2

5

That is fairly easy, the code would look like this

pubsub_msg = (
   pipeline
   | beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)

bigquery_data = (
    pubsub_msg
    | beam.Filter(lambda msg: msg['job_id']==101)   # you might want to use a more sophisticated filter condition
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)

However, if you do it like that you will have a streaming DataFlow job running (indefinitely, or until you cancel the job), since using ReadFromPubSub results automatically in a streaming job. Consequently, this does not start a Dataflow job, when a message is arriving in PubSub, but rather one job is already running and listening to the topic for something to do.

If you actually want to trigger a Dataflow batch job, I would recommend using a Dataflow template, and starting this template with a Cloud Function which listens to your PubSub topic. The logic of the filtering would then be within this CloudFunction (as a simple if condition).

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your answer. But how can I check if the status is 'Success' for the job_id and only then proceed to the next step? The code looks like it will move to the ReadFromBigQuery task even if the Filter doesn't give any results.
beam.Filter expects a custom function that returns either true or false. Only those elements which return true are propagated down the line. So if your input returns always false, the ReadFromBigQuery is never executed. You may use any complex custom function you like (including checking on status in your case), see the documentation.
1

I ended up using Cloud Functions, added the filtering logic in it and starting the Dataflow from there. Found the below link useful. How to trigger a dataflow with a cloud function? (Python SDK)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.