0

I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.

I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.

The google_search module is defined as below:

from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging

def calculate_score(link, term):
    if term and term in link:
        return 100
    elif 'xxx' in link and 'yyy' in link:
        return 75
    elif 'xxx' in link:
        return 50
    elif link:
        return 25
    else:
        return None

def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
    results = []
    error_values = {key: 'Error' for key in ['urls', 'score']}
    success = True
    error_code = 0
    for term in tqdm(search_terms):
        try:
            if term is None:
                row = {
                    'search_item': term,
                    'urls': [],
                    'score': []
                }
            else:
                result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
                items = result.get('items', [])
                top_results = [item.get('link') for item in items[:num_results]]
                scores = [calculate_score(link, term) for link in top_results]

                row = {
                    'search_item': term,
                    'urls': top_results,
                    'score': scores
                }
                logging.info('Search completed successfully')
        except Exception as e:
            success = False
            error_code = 74
            row = {'search_item': term, 
                   **error_values}
            logging.error(f'An error occurred during calling the Search function. {e}')

        results.append(row)

    return success, results, error_code

The init.py function:

import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os

def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        logging.info('Python HTTP trigger function processed a request.')
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')
        req_body = req.get_json()
        search_terms = req_body.get('search_terms')
        num_results = int(req_body.get('num_results', 5))
        country_code = req_body.get('country_code', 'uk')
        params = req_body.get('params', {})

        if not search_terms or not api_key or not cse_id:
            logging.error('Missing required parameters')
            return func.HttpResponse('Missing required parameters', status_code=400)

        success, results, error_code = search(search_terms=search_terms,
                                              num_results=num_results,
                                              country_code=country_code,
                                              api_key=api_key,
                                              cse_id=cse_id)

        response_data = {
            'success': int(success),
            'error_code': int(error_code),
            **params,
            'results': results
        }
        response_json = json.dumps(response_data)

        logging.info('API Call completed successfully')
        return func.HttpResponse(response_json, mimetype='application/json')
    
    except Exception as e:
        logging.error(f'An error occurred: {str(e)}')
        error_code = 66
        response_data = {
            'success': 0,
            'error_code': int(error_code),
            **params
        }
        response_json = json.dumps(response_data)
        return func.HttpResponse(response_json, status_code=500, mimetype='application/json')

And a sample request:

{
  "search_terms": ["term1", "term2", "term3"],
  "num_results": 3,
  "params": {
    "search_id": "123",
    "engine_name": "Google Search"}   
}

Desired output example:

{
    "success": 1,
    "error_code": 0,
    "search_id": "123",
    "engine_name": "Google Search",
    "results": [
        {
            "search_item": "term1",
            "urls": [
                "https://sampleresult3.com",
                "https://sampleresult2.com",
                "https://sampleresult3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        },
        {
            "search_item": "term2",
            "urls": [
                "https://whatever1.com",
                "https://whatever.2.com",
                "https://whatever3.com"
            ],
            "score": [
                25,
                25,
                75
            ]
        },
        {
            "search_item": "term3",
            "urls": [
                "https://www.link1.com",
                "https://link2.com",
                "https://www.link3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        }
    ]
}

EDIT

I tried with the below activity function:

from google_search_scoring_clean import search
import os

def main(search_terms, num_results, country_code):
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    if not search_terms or not api_key or not cse_id:
        return False, []

    success, results = search(search_terms=search_terms,
                              num_results=num_results,
                              country_code=country_code,
                              api_key=api_key,
                              cse_id=cse_id)

    return success, results

but received an error messege: Result: Failure Exception: FunctionLoadError: cannot load the ActivityFunction function: the following parameters are declared in Python but not in function.json: {'country_code', 'search_terms', 'num_results'}

After editing the function.json to

{
  "bindings": [
    {
      "name": "search_terms",
      "type": "string[]",
      "direction": "in"
    },
    {
      "name": "num_results",
      "type": "int",
      "direction": "in"
    },
    {
      "name": "country_code",
      "type": "string",
      "direction": "in"
    }
  ]
}

however, I receive:

The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.

EDIT2:

The below also won't work:

import os
from googleapiclient import discovery
import logging

def main(searchTerm: str) -> str:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if searchTerm is None:
            results = {
                'search_term': searchTerm,
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': searchTerm,
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': searchTerm, **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

I adjusted the name from 'name' to 'searchTerm' in the function.json. The output is:

{
    "name": "Orchestrator",
    "instanceId": "4de8cc4818554208ad599e8687ca77a7",
    "runtimeStatus": "Running",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": null,
    "createdTime": "2023-05-31T10:37:24Z",
    "lastUpdatedTime": "2023-05-31T10:37:24Z"
}

EDIT3: It worked with the following adjustments

  1. In function.json of Activity Function I changed 'name' to activityVar - somehow it does not accept activity_var name, have no idea why
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "activityVar",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

Orchestrator function:

import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("ActivityFunction", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

Where my activity function folder is named "ActivityFunction".

Activity Function for now, as I have to prettify it:

import os
from googleapiclient import discovery
import logging

def main(activityVar: dict) -> dict:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if activityVar['search_term'] is None:
            results = {
                'search_term': activityVar['search_term'],
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': activityVar['search_term'],
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': activityVar['search_term'], **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

God, it's been a long day. Gotta wrap my head around it once again.

9
  • If the line where you call the search function is taking all of the time, then you would need to somehow split that into executable parts to get benefit of using Durable Functions. As it stands, it doesn't look like it would get any benefit. Commented May 30, 2023 at 9:38
  • In fact all I need is to allow it to run more than 230-240 seconds and durable functions seem to be one of the most 'popular' solutions. I simply need to avoid the timeout. Commented May 30, 2023 at 9:48
  • The way Durable Functions allow longer execution is by splitting the execution to activity functions. These activity functions still have the same timeout as functions normally have. If you only have one step that takes all the time, moving it to an activity does not change anything. Commented May 30, 2023 at 10:11
  • So I should somehow split the search_terms into chunks or even single queries so they can be conducted in the async way, right? Commented May 30, 2023 at 11:10
  • Yeah, exactly :) So that each activity function can run one chunk within the normal limits. Commented May 30, 2023 at 11:11

1 Answer 1

1

The pattern you need to follow is the fan out fan in pattern. I won't be writing the full code for you but you can follow the example given here. My response below should guide you to write the code needed by you.

The aim is to split the search terms list into separate variables so you can trigger multiple activity functions and each of them can do a search for a single variable independently. Since these activity functions are not http triggered functions they can go beyond the 230s limit.

Your http triggered function will look like this. It needs to pass the request body into the orchestrator so you can split the search terms up there before calling the activity functions.

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    requestBody = json.loads(req.get_body().decode())
    instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Your Orchestrator will now recreate the body as a dictionary and pass that as a variable to the activity functions. Only difference is, each activity function will receive only 1 search term. You will get back a list in results which you can format to what you need before returning back a response.

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("Activity", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

Finally your activity function will hold the main logic to do the search and return back results for a single search term.

1 important point to remember is that since this entire process is asynchronous, when you call the http starter function, you will immediately get back a dictionary of links while the actual process runs in the background. You will need to implement some kind of polling on the "statusQueryGetUri" link in fixed or exponential backoff intervals to get a status of the execution. Once the result is set to "Completed" you will find your result in the "output" variable.
Below is an example of calling the "statusQueryGetUri" link.

{
    "name": "Orchestrator1",
    "instanceId": "1a98f11135494cf88fa1d3241b8cc4f3",
    "runtimeStatus": "Completed",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": [
        "Hello {'search_term': 'term1', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term2', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term3', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!"
    ],
    "createdTime": "2023-05-30T12:35:22Z",
    "lastUpdatedTime": "2023-05-30T12:35:24Z"
}
Sign up to request clarification or add additional context in comments.

18 Comments

Thank you for pointing out to the fan out fan in pattern. However I still have no idea how to adjust my activity function. Any samples I try fail during the local testing...
Did you already refer to the link I provided? An example activity would be learn.microsoft.com/en-us/azure/azure-functions/durable/…. Whatever you have already written will need to go in this activity. And you need to return the response you get back for that particular search item. Your orchestrator can then merge all the responses together. What error is your activity failing with ? Maybe update your question with your new code and the new error.
I added the function to the question - followed by other workarounds and outputs. It seems that I simply don't get the point of the durable functions; never dealt with such a tool before and really struggle with the solution.
As far as I know, you cannot send multiple parameters from Orchestrator to activity. If you refer to my Orchestrator you can see that I'm creating 1 dictionary variable called activity_var and only passing that to the activity. There is no need to change the binding. You can then unpack it in the activity itself. You can refer to this answer on that problem which address it in the same way. stackoverflow.com/questions/65231702/….
Thank you for the provided link - I will try to somehow adjust the activity function, however as my deadline is in like 2 days I will probably need to experiment either with the requests sent in chunks to avoid exceeding the time limit or some 'keep_alive' function that will send requests every x seconds - durable functions seem to be too hard for me at the moment with the search function I constructed.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.