我使用的是芹菜和Django的芹菜。 我已经定义,我想测试一个周期性任务。 是否可以手动设置,使我查看控制台输出运行从shell周期性任务?
Answer 1:
您是否尝试过只是运行从Django的壳任务? 您可以使用.apply
的任务的方法,以确保它急切地和本地运行。
假设任务被称为my_task
在Django应用程序myapp
在tasks
子模块:
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
结果实例具有相同的API通常AsyncResult
类型,不同的是结果总是急切和本地计算和.apply()
方法将阻塞,直到任务运行到完成。
Answer 2:
我认为你需要打开两个shell:一个从Python / Django的壳执行的任务,以及一个用于运行celery worker
( python manage.py celery worker
)。 和以前的答案说,你可以通过运行任务apply()
或apply_async()
我已经编辑这样你就不会使用过时的命令答案。
Answer 3:
如果你仅仅意味着一个触发任务时的条件没有被满足,例如,周期时间不符合。 您可以在两个步骤去做。
1.Get你的任务ID。
您可以通过键入做到这一点。
celery inspect registered
你会看到类似app.tasks.update_something
。 如果没有的话,可能是celery
没有启动。 只要运行它。
2.运行与任务celery call
celery call app.tasks.update_something
有关详细信息,只需键入
celery --help
celery inspect --help
celery call --help
文章来源: How can I run a celery periodic task from the shell manually?