88

I am trying to come up with a testing methodology for our django-celery project. I have read the notes in the documentation, but it didn't give me a good idea of what to actually do. I am not worried about testing the tasks in the actual daemons, just the functionality of my code. Mainly I am wondering:

  1. How can we bypass task.delay() during the test (I tried setting CELERY_ALWAYS_EAGER = True but it made no difference)?
  2. How do we use the test settings that are recommended (if that is the best way) without actually changing our settings.py?
  3. Can we still use manage.py test or do we have to use a custom runner?

Overall any hints or tips for testing with celery would be very helpful.

6
  • 1
    what do you mean CELERY_ALWAYS_EAGER makes no difference? Commented Nov 3, 2010 at 9:23
  • I still get errors about not being able to contact rabbitmq. Commented Nov 3, 2010 at 14:57
  • Do you have the traceback? I guess something other than .delay could be trying to establish an connection. Commented Nov 3, 2010 at 22:37
  • 11
    Setting BROKER_BACKEND=memory could help in that case. Commented Nov 3, 2010 at 22:38
  • Ask you were right. BROKER_BACKEND=memory fixed it. If you put that as an answer I will mark it correct. Commented Jan 4, 2011 at 22:35

6 Answers 6

77

I like to use the override_settings decorator on tests which need celery results to complete.

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

If you want to apply this to all tests you can use the celery test runner as described at http://docs.celeryproject.org/en/2.5/django/unit-testing.html which basically sets these same settings except (BROKER_BACKEND = 'memory').

In settings:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Look at the source for CeleryTestSuiteRunner and it's pretty clear what's happening.

Sign up to request clarification or add additional context in comments.

3 Comments

This didn't work with celery 4, even with the field renames from here
Works on Celery 3.1. I just have my Celery test cases inherit from a parent class with this decorator. That way it's only needed in one place and there's no need to pull in djcelery.
This works great on Celery 4.4. and Django 2.2. Best approach for running unit tests, that I came across so far.
47

Try setting:

BROKER_BACKEND = 'memory'

(Thanks to asksol's comment.)

3 Comments

I believe this is no longer necessary when CELERY_ALWAYS_EAGER is set.
Did you find a solution for celery 4?
BROKER_BACKEND apparently is only for Celery 3.x (github.com/celery/celery/blob/v3.1.26/celery/app/utils.py#L72). Celery 4.x/5.x is using CELERY_BROKER_URL = "memory://" (github.com/celery/celery/blob/5.0/celery/contrib/testing/…)
19

Here's an excerpt from my testing base class that stubs out the apply_async method and records to the calls to it (which includes Task.delay.) It's a little gross, but it's managed to fit my needs over the past few months I've been using it.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

Here's an "off the top of the head" example of how you'd use it in your test cases:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)

Comments

4

since I still see this come up in search results, settings override with

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

worked for me as per Celery Docs

Comments

2

For everyone getting here in 2019: checkout this article covering different strategies, including calling tasks synchronously.

Comments

2

This is what I did

Inside myapp.tasks.py I have:

from celery import shared_task

@shared_task()
def add(a, b):
    return a + b

Inside myapp.test_tasks.py I have:

from django.test import TestCase, override_settings
from myapp.tasks import add


class TasksTestCase(TestCase):

    def setUp(self):
        ...

    @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
    def test_create_sections(self):
        result= add.delay(1,2)
        assert result.successful() == True
        assert result.get() == 3

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.