0

I have the following code. This is working fine. But I am iterating metrics in a for loop in get_host_types(). I want to create subtasks from get_host_types() function for each metric which will call celery tasks get_host_type(). This will allow the subtasks to run independently on worker nodes. I want to wait for the results in the method get_host_types() and return the results. I was thinking to use group(). But I can not call .get() on AsyncResult(). If I don't parallelize, I am not utilizing the distributed task framework to speed up the main task.

from __future__ import print_function


from celery import Celery, group
import requests

app = Celery('celery_test')

app.config_from_object('config')

@app.task
def get_host_type(metric, alert):
    host_types = get_host_types_for_alert(alert['alert_id'], metric)
    return host_types

class MyObject(dict):
    def __init__(self, alert, host_types):
        dict.__init__(self, alert=alert, host_types=host_types)


@app.task(serializer='json')
def get_host_types(my_obj):
    print(f"alert get_host_types ============> {my_obj}")
    alert = my_obj['alert']['alert']
    metrics = my_obj['host_types']
    ret_val = set()
    for m in metrics:
        res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
        ret_val.update(res)
    print(f"Return value ======> {ret_val}")
    return list(ret_val)


@app.task
def get_metrics(alert):
    print(f" alert ==> {alert}")
    #alert = alert[0]
    metric = alerts_client.get_metrics(alert['alert'])
    metrics = alert_id_host_type_mapper.metric_parser(metric)
    return MyObject(alert, metrics)


@app.task
def get_alert(alert_id):
    print(f" =====> alert id {alert_id}")
    return alerts_client.get_alerts(alert_id)


if __name__ == "__main__":
    res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
    print(res.get())

Edit: If I do result.get() in a subtask I get the following error.

[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
    result_int = res.get()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
    on_interval=on_interval,
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
    assert_will_not_block()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
    raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
2
  • Just to clarify you want to call get_host_type(metric, alert) subtasks from get_host_types(my_obj) task and collect the results of all the subtasks? Commented Feb 15, 2022 at 18:04
  • @dassum. Yes. I have updated the above post with the exception I get when I do result.get() from a subtask. Commented Feb 15, 2022 at 19:11

1 Answer 1

2

You can use Celery Group like below

@app.task(serializer='json')
def get_host_types(my_obj):
    print(f"alert get_host_types ============> {my_obj}")
    alert = my_obj['alert']['alert']
    metrics = my_obj['host_types']
    ret_val = []
    tasks = []
    for m in metrics:
        tasks.append(get_host_type.s(m,alert['id']))
    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async()
    ret_val = result.get(disable_sync_subtasks=False)
    return ret_val

For more information on Celery Group refer -> http://ask.github.io/celery/userguide/groups.html#groups

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.