4

We currently have a Python Apache Beam pipeline working and able to be run locally. We are now in the process of having the pipeline run on Google Cloud Dataflow and be fully automated but have a found a limitation in Dataflow/Apache Beam's pipeline monitoring.

Currently, Cloud Dataflow has two ways of monitoring your pipeline(s) status, either through their UI interface or through gcloud in the command line. Both of these solutions do not work great for a fully automated solution where we can account for loss-less file processing.

Looking at Apache Beam's github they have a file, internal/apiclient.py that shows there is a function used to get the status of a job, get_job.

The one instance that we have found get_job used is in runners/dataflow_runner.py.

The end goal is to use this API to get the status of a job or several jobs that we automatically trigger to run to ensure they are all eventually processed successfully through the pipeline.

Can anyone explain to us how this API can be used after we run our pipeline (p.run())? We do not understand where runner in response = runner.dataflow_client.get_job(job_id) comes from.

If someone could provide a larger understanding of how we can access this API call while setting up / running our pipeline that would be great!

1 Answer 1

3

I ended up just fiddling around with the code and found how to get the job details. Our next step is to see if there is a way to get a list of all of the jobs.

# start the pipeline process
pipeline                 = p.run()
# get the job_id for the current pipeline and store it somewhere
job_id                   = pipeline.job_id()
# setup a job_version variable (either batch or streaming)
job_version              = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
# setup "runner" which is just a dictionary, I call it local
local                    = {}
# create a dataflow_client
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version)
# get the job details from the dataflow_client
print local['dataflow_client'].get_job(job_id)
Sign up to request clarification or add additional context in comments.

2 Comments

Hey @T.Okahara, any change you have figured out how to do this with a dataflow template?
Sorry @jmoore255 other than the code above we didn't work more on getting our pipelines running in Cloud Dataflow. We actually built our own locally running machine to run our processes on since we found other issues with running on Dataflow like not allowing us to trigger from App Engine and the slow startup/cleanup times. It might be different now but we still run that our pipelines (doing data munging for ML) locally.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.