I am developing an application where i have to periodically call a function which is in myapp/task.py from the celery.py file which is located in my project directory.But when I do an import statement from the celery.py file like
from myapp.tasks import test
I get the following error which I am not able to resolve:
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Here is the code from init.py:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
The Code from celery.py
from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from myapp.tasks import test
os.environ.setdefault('DJANGO_SETTINGS_MODULE','project.settings') app=Celery('project')
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_monitor_call(sender,**kwargs):
sender.add_periodic_task(30.0,test.s(),name='call test every 30 seconds')
Now in my app directory there is a file named tasks.py which has the test method which i am calling from celery.py:
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from celery import shared_task
from celery.utils.log import get_task_logger
logger=get_task_logger(__name__)
def test():
print("Hello Celery")
Now the exact problem is that when i write the following import statement in the celery.py file:
from myapp.tasks import test
and then when i run python manage.py runserver i get the error:
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
The moment i remove the import statement, the server runs correctly. I cannot figure out why?