Celery revoke task before execute using django dat

2019-02-20 17:16发布

问题:

I'm using Django database instead of RabbitMQ for concurrency reasons.

But I can't solve the problem of revoking a task before it execute.

I found some answers about this matter but they don't seem complete or I can't get enough help.

  • first answer
  • second answer

How can I extend celery task table using a model, add a boolean field (revoked) to set when I don't want the task to execute?

Thanks.

回答1:

Since Celery tracks tasks by an ID, all you really need is to be able to tell which IDs have been canceled. Rather than modifying kombu internals, you can create your own table (or memcached etc) that just tracks canceled IDs, then check whether the ID for the current cancelable task is in it.

This is what the transports that support a remote revoke command do internally:

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes). (from Celery docs)

When you use the django transport, you are responsible for doing this yourself. In this case it's up to each task to check whether it has been canceled.

So the basic form of your task (logging added in place of an actual operation) becomes:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

You can extend & improve this in various ways, such as by creating a base CancelableTask class as in one of the other answers you linked to, but this is the basic form. What you're missing now is the model and the function to check it.

Note that the ID in this case will be a string ID like a5644f08-7d30-43ff-a61e-81c165ad9e19, not an integer. Your model can be as simple as this:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

You can now check the behavior by watching your celery service's debug logs while doing things like:

my_task.delay()
models.cancel_task(my_task.delay())