0

That's my libs:

celery==5.3.6
django-celery-beat==2.5.0
django-celery-results==2.5.1
redis==5.0.1

settings.py:

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",

    "bot_users.apps.BotUsersConfig",
    "products.apps.ProductsConfig",
    "categories.apps.CategoriesConfig",
    "commands.apps.CommandsConfig",
    "stories_news.apps.StoriesNewsConfig",
    "posts.apps.PostsConfig",
    "polls.apps.PollsConfig",
    "bot_statistics",
    "xml_import",


    "rest_framework",
    "nested_admin",
    "import_export",
    "ckeditor",
    "imagekit",
    "django_celery_beat",
    "django_celery_results",
]


REDIS_HOST = '127.0.0.1'
REDIS_PORT = '6379'
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}'
CELERY_BROKER_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'
CELERY_RESULT_EXTENDED = True

celery.py:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'src.settings')

app = Celery('src')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

task function (in services.py):

@app.task
def import_data_from_xml(*args):
    from .admin import ProductResource
    from xml_import.models import XMLImportSettings
    from .formats import XML

    xml_settings = XMLImportSettings.objects.all().first()
    logging.info(f'xml_settings:{xml_settings}')

    folder_path = xml_settings.folder_path
    logging.info(f'folder_path:{folder_path}')
    file_name = xml_settings.file_name
    logging.info(f'file_name:{file_name}')

    # Полный путь к XML файлу
    xml_file_path = f"{folder_path}/{file_name}"
    logging.info(f'xml_file_path:{xml_file_path}')



    # Создаем экземпляр ресурса
    resource = ProductResource()
    logging.info(f'resource:{resource}')

    
    xml_formatter  = XML()
    logging.info(f'xml_formatter:{xml_formatter}')
    with open(xml_file_path, 'r', encoding='utf-8') as xml_file:
        # Создаем dataset из XML данных
        data = xml_formatter.create_dataset(xml_file.read())
        logging.info(f'data:{data}')

    # Импортируем данные из XML файла
    dataset = resource.import_data(dataset=data, raise_errors=True)
    logging.info(f'dataset:{dataset}')

    # Возвращаем результат импорта
    return dataset

My celery configuration seems correct.

That's celery consoles: worker:

[2024-03-15 15:04:56,108: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2024-03-15 15:04:56,116: WARNING/MainProcess] C:\Users\hp\desktop\job\venv\Lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-03-15 15:04:56,182: INFO/MainProcess] mingle: searching for neighbors
[2024-03-15 15:04:57,103: INFO/SpawnPoolWorker-3] child process 2004 calling self.run()
[2024-03-15 15:04:57,234: INFO/SpawnPoolWorker-1] child process 15900 calling self.run()
[2024-03-15 15:04:57,243: INFO/SpawnPoolWorker-4] child process 14456 calling self.run()
[2024-03-15 15:04:57,327: INFO/MainProcess] mingle: all alone
[2024-03-15 15:04:57,391: INFO/SpawnPoolWorker-2] child process 3280 calling self.run()
[2024-03-15 15:04:57,491: INFO/MainProcess] celery@LAPTOP-2554OM7H ready.
[2024-03-15 15:04:57,505: INFO/MainProcess] Task products.services.import_data_from_xml[917a7e74-79d0-47dd-b857-c9cb5c37995d] received
[2024-03-15 15:04:57,509: INFO/MainProcess] Task products.services.import_data_from_xml[5c056738-5974-4f3c-aed9-c824bf8bad0a] received
[2024-03-15 15:05:03,554: INFO/SpawnPoolWorker-5] child process 18000 calling self.run()
[2024-03-15 15:05:04,060: INFO/SpawnPoolWorker-6] child process 3016 calling self.run()
[2024-03-15 15:09:00,978: INFO/MainProcess] Task products.services.import_data_from_xml[1496d6c4-06e2-4117-9571-22d5314eb4b5] received
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-7] child process 868 calling self.run()
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-8] child process 8588 calling self.run()
[2024-03-15 15:09:02,684: INFO/SpawnPoolWorker-9] child process 9280 calling self.run()

beat:

(venv) PS C:\Users\hp\desktop\job> celery -A src beat -l info
celery beat v5.3.6 (emerald-rush) is starting.
__    -    ... __   -        _
LocalTime -> 2024-03-15 15:05:07
Configuration ->
    . broker -> redis://127.0.0.1:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2024-03-15 15:05:07,522: INFO/MainProcess] beat: Starting...
[2024-03-15 15:06:20,632: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2024-03-15 15:06:36,583: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2024-03-15 15:09:00,336: INFO/MainProcess] Scheduler: Sending due task xml_import (products.services.import_data_from_xml)

Function does xml import from xml document to django. I'm expecting function to execute (it works 100%, I've checked it), but celery doesn't do it.

I think the problem with configuration here, but don't exactly understand how to find it. Everything seems to be properly configurated, beat sends task and worker receive it

1 Answer 1

0

I can't see any obvious problem with your code, but I suspect there is something you are not showing us. Maybe just set up the task logger using celery.utils.log.get_task_logger, so you can see if it actually starts the function. Find a link with info below.

https://www.programcreek.com/python/example/88906/celery.utils.log.get_task_logger

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.