1

I'm trying to setup a Dataproc Serverless Batch Job from google cloud composer using the DataprocCreateBatchOperator operator that takes some arguments that would impact the underlying python code. However I'm running into the following error:

error: unrecognized arguments: --run_timestamp "2022-06-17T13:22:51.800834+00:00" --temp_bucket "gs://pipeline/spark_temp_bucket/hourly/" --bucket "pipeline" --pipeline "hourly"

This is how my operator is setup:

create_batch = DataprocCreateBatchOperator(
        task_id="hourly_pipeline",
        project_id="dev",
        region="us-west1",
        batch_id="".join(random.choice(string.ascii_lowercase + string.digits + "-") for i in range(40)),
        batch={
            "environment_config": {
                "execution_config": {
                    "service_account": "<service_account>",
                    "subnetwork_uri": "<uri>
                }
            },
            "pyspark_batch": {
                "main_python_file_uri": "gs://pipeline/code/pipeline_feat_creation.py",
                "args": [
                    '--run_timestamp "{{ ts }}"',
                    '--temp_bucket "gs://pipeline/spark_temp_bucket/hourly/"',
                    '--bucket "pipeline"',
                    '--pipeline "hourly"'
                ],
                "jar_file_uris": [
                    "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.0.jar"
                ],
            }
        }
    )

Regarding the args array: I tried setting the parameters with and without encapsulating them with "". I've also already did a gcloud submit that worked like so:

gcloud dataproc batches submit pyspark "gs://pipeline/code/pipeline_feat_creation.py" \
--batch=jskdnkajsnd-test-10 --region=us-west1 --subnet="<uri>" \
-- --run_timestamp "2020-01-01" --temp_bucket gs://pipeline/spark_temp_bucket/hourly/ --bucket pipeline --pipeline hourly

1 Answer 1

2

The error I was running into was that I wasn't adding a = after each parameter; I've also eliminated the " encapsulation around each parameter. This is how the args are now setup:

"args": [
    '--run_timestamp={{ ts }}',
    '--temp_bucket=gs://pipeline/spark_temp_bucket/hourly/',
    '--bucket=pipeline',
    '--pipeline=hourly'
]
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.