I am trying to test some celery functionality in Django's unit testing framework, but whenever I try to check an AsyncResult the tests act like it was never started.
I know this code works in a real environment with RabbitMQ, so I was just wondering why it didn't work when using the testing framework.
Here is an example:
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_ALWAYS_EAGER = True,
BROKER_BACKEND = 'memory',)
def test_celery_do_work(self):
result = myapp.tasks.celery_do_work.AsyncResult('blat')
applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')
applied_task.wait()
# THIS SUCCEEDS
self.assertTrue(applied_task.successful())
# THIS FAILS
self.assertTrue(result.successful())
Does using the ALWAYS_EAGER option disable the AsyncResult functionality since it's executing immediately? If so, is there any way to able to unit test AsyncResult status checking? If I try to take out the ALWAYS_EAGER option the tests never run, so I am at a loss.
Thanks!