2

I have a cloud function that is triggered by cloud Pub/Sub. I want the same function trigger dataflow using Python SDK. Here is my code:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : {}'.format(message))

I deploy the function this way:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1
2
  • 1
    where is located the code of your dataflow pipeline ? In a template? in an external file? Commented Oct 28, 2019 at 12:59
  • in a python file. I work on google shell Commented Oct 28, 2019 at 13:02

2 Answers 2

2

You have to embed your pipeline python code with your function. When your function is called, you simply call the pipeline python main function which executes the pipeline in your file.

If you developed and tried your pipeline in Cloud Shell and you already ran it in Dataflow pipeline, your code should have this structure:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

Thus, call this function with the correct argument especially the runner=DataflowRunner to allow the python code to load the pipeline in Dataflow service.

Delete at the end the result.wait_until_finish() because your function won't live all the dataflow process long.

You can also use template if you want.

Sign up to request clarification or add additional context in comments.

3 Comments

I'm running it on DirectRunner. In fact I have multiples python files, each file contains a different pipeline. I want to write one cloud function that runs each time a pipeline. I also tried to do a subprocessing of the command line that executes my pipeline this way : tmp = subprocess.run(["python", "./defaultTrigger.py --network 'test' --input_topic 'projects/...../suscriptions/sub1' -output_topic 'projects/.../topics/topic2'"]) print(tmp) it did not work. I'll try your solution, thanks a lot
1/2 @Rim, take care to not mix up. Dataflow is a platform, Beam is a framework. When you run your Beam pipeline in DirectRunner, you don't use the Dataflow platform to execute it, you use the current system, in your case your function environment, for running the Beam code. It's not recommended, except if you set up 2Gb of memory for having the highest CPU power for your Beam pipeline.
2/2 The other part is your wish to perform a subprocess in Function and to call "python" in this subprocess. Remember, you are in serverless architecture, you don't know what is the underlying server, OS and platform. Performing a subprocess python can lead to unexpected thing (python 2 or 3? Which dependencies?...) if the call is accepted. Indeed, you are billed for the request processing time. If you fork the process and run background thread, the billing is not fair. That's why it's forbidden to perform this kind of operation. I recommend you to redesign your app.
1

You can use Cloud Dataflow templates to launch your job. You will need to code the following steps:

  • Retrieve credentials
  • Generate Dataflow service instance
  • Get GCP PROJECT_ID
  • Generate template body
  • Execute template

Here is an example using your base code (feel free to split into multiple methods to reduce code inside hello_pubsub method).

from googleapiclient.discovery import build
import base64
import google.auth
import os

def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'

    credentials, _ = google.auth.default()
    service = build('dataflow', 'v1b3', credentials=credentials)
    gcp_project = os.environ["GCLOUD_PROJECT"]

    template_path = gs://template_file_path_on_storage/
    template_body = {
        "parameters": {
            "keyA": "valueA",
            "keyB": "valueB",
        },
        "environment": {
            "envVariable": "value"
        }
    }

    request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
    response = request.execute()

    print(response)

In template_body variable, parameters values are the arguments that will be sent to your pipeline and environment values are used by Dataflow service (serviceAccount, workers and network configuration).

LaunchTemplateParameters documentation

RuntimeEnvironment documentation

6 Comments

The parameters in template body would be the arguments which will be needed to run the dataflow. Right? If dataflow job is taking input as GCS location then in parameters it should be "input": "GCS loacation" . Right?
@PriyaAgarwal Yes, you are right. The parameters sent via template needs to be retrieved on job using ValueProvider interface. This guy allows pipelines to accept runtime parameters. beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/… beam.apache.org/releases/pydoc/2.19.0/…
Quick question- is a new dataflow job created every time the cloud function is triggered?
@MineshBarot yes
getting error when we adding "Parameters" like below parameters = { 'input': inputFile, 'output': outputFile, }
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.