I am using postgresql as my backend for Celery (v5.3.5).
Celery is returning an SQL error when I call ready() on the task ASyncResult:
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'AsyncResult'
[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback, celery_taskmeta.name AS celery_taskmeta_name, celery_taskmeta.args AS celery_taskmeta_args, celery_taskmeta.kwargs AS celery_taskmeta_kwargs, celery_taskmeta.worker AS celery_taskmeta_worker, celery_taskmeta.retries AS celery_taskmeta_retries, celery_taskmeta.queue AS celery_taskmeta_queue
FROM celery_taskmeta
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': <AsyncResult: 0ae578c2-85b2-4d13-9002-50604329a480>}]
(Background on this error at: https://sqlalche.me/e/20/f405)
There is quite a long traceback, mostly sqlalchemy, the last Celery trace was:
File "/dist-packages/celery/backends/database/\_\_init__.py", line 152, in _get_task_meta_for
task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
This is the task worker script (called task_queue.py):
from celery import Celery, current_app, Task
broker = 'sqla+postgresql://user:password@server/db'
backend = 'db+postgresql://user:password@server/db'
app : Celery = Celery('tasks', broker=broker, backend=backend)
@app.task(bind=True)
def long_running_task(self, seconds : int):
""" A task that takes a number of seconds to complete. """
print('Starting count off of {0} seconds'.format(seconds))
for i in range(seconds):
print('{0} seconds left'.format(seconds - i))
sleep(1)
and this is how it is called:
from task_queue import long_running_task
from celery.result import AsyncResult
id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)
while not task.ready():
print("Waiting for task to complete")
sleep(1)
I cannot see what I am doing wrong here, this same code works if I use rpc as a backend. Clearly that SQL call from Celery is expecting a task id but is getting an AsyncResult instead. Is this a bug in celery? Any ideas much appreciated.
(Addendum: This same error occurs trying to get any information from the task result eg: task.name, task.result, task.args)
bind=Trueand just have@app.task? - You really do not need it for that particular task.