2

I have a batch processing job running in dataflow on gcp under version apache-beam[gcp]==2.19.0 on the dataflow runner. I created a custom template for the job. The job is running as expected, but I also want to add a max job duration. I found the duration (in milliseconds) parameter inside the wait_until_finish() method, which should be available. Question is: How to let the templated batch job stop automatically when it runs longer than duration? I do not need to keep any data, I just want the job to stop when it runs too long. I've implemented the run function as follows:

def run():
    opts = PipelineOptions()
    user_options = opts.view_as(UserOptions)
    p = beam.Pipeline(options=opts)

    (p |
     "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
                                                        use_standard_sql=StaticValueProvider(bool, True))) |
     "Get data" >> beam.ParDo(doStuff()) |

     "Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |

     "Write to BQ" >> beam.io.WriteToBigQuery(
                table=user_options.table_spec,
                schema=user_options.table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
     )

    result = p.run()

    result.wait_until_finish(duration=1800000)

1 Answer 1

4

No, Dataflow does not provide auto cancellation after certain period. Still you can achieve your goal by simply putting cancel()

    result.wait_until_finish(duration=1800000)
    if not result.is_in_terminal_state():   # if pipeline isn't finished, cancel
      result.cancel()
Sign up to request clarification or add additional context in comments.

2 Comments

@Pablo is it possible to do the same if I use with beam.Pipeline() as pipeline: pass # build your pipeline here
This would not be possible afaik, since you have no way of passing the timeout parameter

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.