You can use Cloud Dataflow templates to launch your job. You will need to code the following steps:
- Retrieve credentials
- Generate Dataflow service instance
- Get GCP PROJECT_ID
- Generate template body
- Execute template
Here is an example using your base code (feel free to split into multiple methods to reduce code inside hello_pubsub method).
from googleapiclient.discovery import build
import base64
import google.auth
import os
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials)
gcp_project = os.environ["GCLOUD_PROJECT"]
template_path = gs://template_file_path_on_storage/
template_body = {
"parameters": {
"keyA": "valueA",
"keyB": "valueB",
},
"environment": {
"envVariable": "value"
}
}
request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
response = request.execute()
print(response)
In template_body variable, parameters values are the arguments that will be sent to your pipeline and environment values are used by Dataflow service (serviceAccount, workers and network configuration).
LaunchTemplateParameters documentation
RuntimeEnvironment documentation