我怎样才能从运行手动壳芹菜周期性任务?(How can I run a celery periodi

2019-08-01 12:09发布

我使用的是芹菜和Django的芹菜。 我已经定义,我想测试一个周期性任务。 是否可以手动设置,使我查看控制台输出运行从shell周期性任务?

Answer 1:

您是否尝试过只是运行从Django的壳任务? 您可以使用.apply的任务的方法,以确保它急切地和本地运行。

假设任务被称为my_task在Django应用程序myapptasks子模块:

$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()

结果实例具有相同的API通常AsyncResult类型,不同的是结果总是急切和本地计算和.apply()方法将阻塞,直到任务运行到完成。



Answer 2:

我认为你需要打开两个shell:一个从Python / Django的壳执行的任务,以及一个用于运行celery workerpython manage.py celery worker )。 和以前的答案说,你可以通过运行任务apply()apply_async()

我已经编辑这样你就不会使用过时的命令答案。



Answer 3:

如果你仅仅意味着一个触发任务时的条件没有被满足,例如,周期时间不符合。 您可以在两个步骤去做。

1.Get你的任务ID。

您可以通过键入做到这一点。

celery inspect registered

你会看到类似app.tasks.update_something 。 如果没有的话,可能是celery没有启动。 只要运行它。

2.运行与任务celery call

celery call app.tasks.update_something

有关详细信息,只需键入

celery --help
celery inspect --help
celery call --help


文章来源: How can I run a celery periodic task from the shell manually?