I have three Celery tasks:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata(): # run synchronously
notify_report_was_not_updated.delay()
@celery_app.task
def load_rawdata():
# load and process file from FTP
return False # some error happened
@celery_app.task
def notify_rawdata_was_not_updated():
pass # send email by Django
I need to test that email was sent if load_rawdata task (function) returns False. For that I have written some test which does not work:
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=False)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == '[email protected]'
assert mail.outbox[0].to == ['[email protected]']
It seems notify_rawdata_was_not_updated still being run asynchronously.
How to write proper test?
CELERY_ALWAYS_EAGER=True?