I have a Python script that run a Beam/Dataflow jobs
# add config
p = beam.Pipeline(options=pipeline_options)
# multiple dataflow processes . . .
# in some process I tryna raise the error to make dataflow job failed
result = p.run()
job_status = result.wait_until_finish()
if job_status == "FAILED":
# run something else
As code above, I am trying to handle a case where Dataflow job might failed which there will be a process in case its failed. But after trying with both Direct runner and Dataflow runner. The job ended at the exception I raised in the function. But for the case if the job success it can handle the job_status function e.g. job_status == "DONE"
The wait_until_finish() will return the final state of the pipeline. So, I think I can make use of this function to handle the failed job but its not working. Any idea?
The log shown in Dataflow console show the Exception I raised and its end like that without running anything in my IF condition
Edit:
I found this from Dataflow document that it the wait_til_finish() handle non-success like this. Not sure if there's an alternative to do this?

Edit: What my job actually did is to scrape a data from some website and push it to GCS. The failed part is always on the scraping function where I somehow get 403 by the website in which I have to fix it by running the new Dataflow job manually. I just want to use this to handle to start new job automatically.
def scrape_data():
# scrape data and return a json
def load_to_gcs():
# dump json to gcs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
# Parse pipeline paramerters
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
id_list = ['aa','bb','cc'] # input param for scrape_data()
id = p | "init" >> beam.Create(id_list)
js = id | "scrape" >> beam.Map(scrape_data)
js | "load to gcs" >> beam.Map(load_to_gcs)
result = p.run()
job_status = result.wait_until_finish()
if job_status == "FAILED":
# run something else >> call dataflow api to start new job
When its failed (during scrape) the dataflow job will automatically retry for another 4 times and the job will fail. But once its failed, it did not go into the if condition.