0

I am using a Cloud Composer environment to run workflows in a GCP project. One of my workflows creates a Dataproc cluster in different project using the DataprocClusterCreateOperator, and then attempts to submit a PySpark job to that cluster using the DataProcPySparkOperator from the airflow.contrib.operators.dataproc_operator module.

To create the cluster, I can specify a project_id parameter to create it in another project, but it seems like DataProcPySparkOperator ignores this parameter. For example, I expect to be able to pass a project_id, but I end up with a 404 error when the task runs:

from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator

t1 = DataProcPySparkOperator(
  project_id='my-gcp-project',
  main='...',
  arguments=[...],
)

How can I use DataProcPySparkOperator to submit a job in another project?

1 Answer 1

3

The DataProcPySparkOperator from the airflow.contrib.operators.dataproc_operator module doesn't accept a project_id kwarg in its constructor, so it will always default to submitting Dataproc jobs in the project the Cloud Composer environment is in. If an argument is passed, then it is ignored, which results in a 404 error when running the task, because the operator will try to poll for a job using an incorrect cluster path.

One workaround is to copy the operator and hook, and modify it to accept a project ID. However, an easier solution is to use the newer operators from the airflow.providers packages if you are using a version of Airflow that supports them, because many airflow.contrib operators are deprecated in newer Airflow releases.

Below is an example. Note that there is a newer DataprocSubmitPySparkJobOperator in this module, but it is deprecated in favor of DataprocSubmitJobOperator. So, you should use the latter, which accepts a project ID.

from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator

t1 = DataprocSubmitJobOperator(
  project_id='my-gcp-project-id',
  location='us-central1',
  job={...},
)

If you are running an environment with Composer 1.10.5+, Airflow version 1.10.6+, and Python 3, the providers are preinstalled and can be used immediately.

Sign up to request clarification or add additional context in comments.

1 Comment

If it is a simple Python script [on Google dataproc cluster] that we wish to run, how will the "job" argument be defined in DataprocSubmitJobOperator? Is this even possible?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.