9

I am getting started with Celery and Python, and I have a question that is probably very simple, but I don't seem to be able to find any suitable answer around...

If I have a bunch of tasks, and one of them throws, an exception, is there a way of retrieving the arguments that were passed to said task?

For instance, if I want to get the IPs some hostnames resolve to, and I create a task...

@tasks_app.task
def resolve_hostname(hostname):
    return (hostname, {hst.address for hst in dns.resolver.query(hostname)})

... which can throw an exception, is there a way of getting the value of that hostname argument outside the call when that Exception happens?

Let's say I group the tasks like:

ip_subtasks = group(
    resolve_hostname.s(hostname) for hostname in ['google.com',
                                                  'yahoo.com',
                                                  'failure.kommm']
)()

The last one (that tries to resolve failure.kommm ) will raise an exception. I'd like to put the get() method of the celery task inside a try/catch, and show a message saying Something went wrong when trying to resolve failure.kommm (something like shown below):

for ip_subtask in ip_subtasks:
    try:
        hostname, ips = ip_subtask.get(timeout=45)
    except dns.exception.DNSException, e:
        # I WISHED THIS WORKED:
        logger.exception("Something happened when trying"
                         " to resolve %s" % ip_subtask.args[0])

So, that's the question... Is there a way of retrieving the arguments a task was executed with if I have the task instance itself?

Thank you in advance.

3
  • Have you looked at an on_failure handler? celery.readthedocs.org/en/latest/userguide/… Commented Oct 6, 2014 at 20:33
  • I have, and I think it would work, but I don't know how to use it when the task has been created through a decorator like shown above (as I said, I'm a newbie with Celery) Commented Oct 6, 2014 at 21:06
  • Check out the section above handlers, it shows you how to do just that with an abstract class. Commented Oct 6, 2014 at 21:16

1 Answer 1

11

To do this you can use an abstract class to implement an on_failure handler.

from celery import Task

class DebugTask(Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception("Something happened when trying"
                         " to resolve %s" % args[0])

@tasks_app.task(base=DebugTask)
def resolve_hostname(hostname):
    return (hostname, {hst.address for hst in dns.resolver.query(hostname)})

From the docs:

on_failure(self, exc, task_id, args, kwargs, einfo)

Parameters: 
  exc     – The exception raised by the task.
  task_id – Unique id of the failed task.
  args    – Original arguments for the task that failed.
  kwargs  – Original keyword arguments for the task that failed.
  einfo   – ExceptionInfo instance, containing the traceback.
Sign up to request clarification or add additional context in comments.

3 Comments

on_failure doesn't catch TimeLimitException.
The section of docs immediately following the one linked for on_failure describes how to use a custom Request to handle on_timeout with a pre-forking worker. Otherwise that should be handled by you in the task itself
what is the difference between on_failure (in Task and Request) and on_timeout?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.