0

I am using postgresql as my backend for Celery (v5.3.5).

Celery is returning an SQL error when I call ready() on the task ASyncResult:

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'AsyncResult'
[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback, celery_taskmeta.name AS celery_taskmeta_name, celery_taskmeta.args AS celery_taskmeta_args, celery_taskmeta.kwargs AS celery_taskmeta_kwargs, celery_taskmeta.worker AS celery_taskmeta_worker, celery_taskmeta.retries AS celery_taskmeta_retries, celery_taskmeta.queue AS celery_taskmeta_queue 
FROM celery_taskmeta 
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': <AsyncResult: 0ae578c2-85b2-4d13-9002-50604329a480>}]
(Background on this error at: https://sqlalche.me/e/20/f405)

There is quite a long traceback, mostly sqlalchemy, the last Celery trace was:
  File "/dist-packages/celery/backends/database/\_\_init__.py", line 152, in _get_task_meta_for
    task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))

This is the task worker script (called task_queue.py):

from celery import Celery, current_app, Task

broker = 'sqla+postgresql://user:password@server/db'
backend = 'db+postgresql://user:password@server/db'
app : Celery = Celery('tasks', broker=broker, backend=backend)

@app.task(bind=True)
def long_running_task(self, seconds : int):
    """ A task that takes a number of seconds to complete. """
    print('Starting count off of {0} seconds'.format(seconds))
    for i in range(seconds):
        print('{0} seconds left'.format(seconds - i))
        sleep(1)

and this is how it is called:

from task_queue import long_running_task
from celery.result import AsyncResult

id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)

while not task.ready():
    print("Waiting for task to complete")
    sleep(1)

I cannot see what I am doing wrong here, this same code works if I use rpc as a backend. Clearly that SQL call from Celery is expecting a task id but is getting an AsyncResult instead. Is this a bug in celery? Any ideas much appreciated.

(Addendum: This same error occurs trying to get any information from the task result eg: task.name, task.result, task.args)

2
  • Did you try to remove the bind=True and just have @app.task ? - You really do not need it for that particular task. Commented Nov 21, 2023 at 9:47
  • Thanks, this is just example code to show the error, I will need bind in other places. However I tried what you suggested as an experiment and got the same result. Commented Nov 21, 2023 at 10:48

1 Answer 1

0

Came across the answer entirely by accident while reading comments here: How to check task status in Celery?

Seems that calling "delay" returns an AsyncResult already, but it's not the same as the "raw" celery.result.AsyncResult. This AsyncResult returns the task_id as it's default property so it works when passed to sqlalchemy in order to get the task meta data.

Hence the fix is simply to query the object returned from "delay" - there is no need to go get the AsyncResult:

Changing:

id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)

to:

task = long_running_task.delay(5)
print(f"Task ID {task} queued, state {task.state}, task_id type {type(task)}")

Works!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.