如何动态地添加/删除定期任务,芹菜(celerybeat)如何动态地添加/删除定期任务,芹菜(cel

2019-05-13 13:29发布

如果我有一个函数定义如下:

def add(x,y):
  return x+y

有没有一种方法来动态地添加该功能作为芹菜PeriodicTask并在运行时启动它? 我希望能够做到像(伪代码):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

我也想停止或动态地删除该任务有类似的信息(伪代码):

celery.beat.remove_task(some_unique_task_id)

要么

celery.beat.stop(some_unique_task_id)

仅供参考,我不使用djcelery,它可以让你通过Django管理管理定期任务。

Answer 1:

不,我很抱歉,这是不可能与正规celerybeat。

但它很容易扩展到你想要的东西,例如Django的芹菜调度只是一个子类,阅读和写作计划到数据库(与上面一些优化)。

此外,你甚至可以使用Django的芹菜调度非Django的项目。

事情是这样的:

  • 安装Django Django的+ - 芹菜:

    $ PIP安装-U的Django Django的芹菜

  • 将下列设置添加到您的celeryconfig:

     DATABASES = { 'default': { 'NAME': 'celerybeat.db', 'ENGINE': 'django.db.backends.sqlite3', }, } INSTALLED_APPS = ('djcelery', ) 
  • 创建数据库表:

     $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
  • 先从数据库调度celerybeat:

     $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ -S djcelery.schedulers.DatabaseScheduler 

另外还有的djcelerymon可用于非Django的项目开始celerycam并在同一过程中Django管理webserver命令,你可以用它来在一个不错的web界面还可以编辑您的定期任务:

   $ djcelerymon

(注意:由于某种原因djcelerymon不能使用Ctrl + C停止,你必须使用Ctrl + Z +杀%1)



Answer 2:

这个问题被回答了谷歌群体 。

我不是作者,都归功于马克·吉恩

下面是这一个妥善的解决办法。 确认工作,在我的方案,我子类周期性任务,并创建了一个模型出来的,因为我可以添加其他领域的模型,因为我需要还,所以我可以添加“终止”的方法。 你必须周期性任务的enabled属性设置为False,并将其保存在删除它之前。 整个子类是不是必须的,该schedule_every方法是一个真正做的工作。 当你准备终止您的任务(如果你没有继承它),你可以只使用PeriodicTask.objects.filter(名称= ...)来搜索你的任务,禁用它,然后将其删除。

希望这可以帮助!

 from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete() 


Answer 3:

有一个叫做Django的芹菜拍它提供人们需要的模型库。 为了使动态加载一个有能力创造自己的调度新定期任务。

from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule


Answer 4:

你可以看看这个烧瓶djcelery其配置瓶中,djcelery并且还提供了浏览的REST API



Answer 5:

这最终被成为可能的修复列入芹菜V4.1.0。 现在,你只需要更改数据库后端的日程条目,芹菜拍将根据新的时间表行事。

该文档隐约描述了这一过程。 芹菜拍,默认调度PersistentScheduler ,采用的是搁置的文件在其日程数据库。 在任何更改beat_schedule在字典PersistentScheduler实例与此数据库同步(默认情况下,每3分钟),反之亦然。 该文档描述了如何将新的条目添加到beat_schedule使用app.add_periodic_task 。 要修改现有条目,只需添加具有相同的新条目name 。 删除条目中可以从词典: del app.conf.beat_schedule['name']

假设你要监视和使用外部应用程序修改您的芹菜拍时间表。 然后,你有几种选择:

  1. 您可以open货架数据库文件并阅读其内容就像一本字典。 写回该文件进行修改。
  2. 您可以运行应用程序芹菜的另一个实例,并使用一个上述修改货架文件。
  3. 您可以使用自定义调度类从Django的芹菜拍的时间表存储在一个Django管理数据库,并访问条目存在。
  4. 您可以使用从调度celerybeat -蒙戈的时间表存储在MongoDB的后端,并访问条目存在。


文章来源: How to dynamically add / remove periodic tasks to Celery (celerybeat)