Celery raise error while passing my queryset obj a

2019-08-11 06:05发布

问题:

I trying to execute a periodic task, so I used celery with Django 1.8 and Django Rest Framework and Postgres as Database. When I try to send my obj to the task I get TypeError: foreign_model_obj is not JSON serializable. How can I pass my queryset object to my Task.

views.py :

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)

    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])

        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, {"id": obj.id})
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj], eta=local_time)
        return Response(result)

tasks.py :

@celery_app.task(name="my_celery_task")
def my_first_celery_task(mymodel_obj):
    # ... updating obj attributes
    mymodel_obj.save()

回答1:

You have just to send the id of your instance and retrieve the object within the task. It's a bad practice to pass the instance, since it can be altered in meantime, specially that you are excuting your task with a deplay as it seems to be.

views.py :

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)

    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])

        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, {"id": obj.id})
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj.id], eta=local_time) # send only the obj id
        return Response(result)

tasks.py :

@celery_app.task(name="my_celery_task")
def my_celery_task(mymodel_obj_id):
    my_model_obj = MyModel.objects.get(id=mymodel_obj_id) # retrieve your object here
    # ... updating obj attributes
    mymodel_obj.save()


回答2:

Actually, IMHO the best way to go is to get a picklable component of the queryset, then regenerate the queryset in the task (https://docs.djangoproject.com/en/1.9/ref/models/querysets/):

import pickle
query = pickle.loads(s)     # Assuming 's' is the pickled string.
qs = MyModel.objects.filter(a__in=[1,2,3]) # whatever you want here...
querystr = pickle.dumps(qs.query)      # pickle the queryset
my_celery_task.apply_async(querystr, eta=local_time) # send only the string...

The task:

@celery_app.task(name="my_celery_task")
def my_celery_task(querystr):
    my_model_objs = MyModel.objects.all()
    my_model_objs.query = pickle.loads(querystr) # Restore the queryset
    # ... updating obj attributes
    item=my_model_objs[0]

This is the best approach, I think, because the query will get executed (perhaps the first time) in the task, preventing various timing issues, it need not be executed in the caller (so no doubling up on the query).



回答3:

You could change method of serialization to pickle, but it is not recommended to pass queryset as a parameter. Quote from Celery documentation:

Another gotcha is Django model objects. They shouldn’t be passed on as arguments to tasks. It’s almost always better to re-fetch the object from the database when the task is running instead, as using old data may lead to race conditions.

http://docs.celeryproject.org/en/latest/userguide/tasks.html