Python - Retry a failed Celery task from another q

2019-02-15 18:07发布

问题:

I'm posting a data to a web-service in Celery. Sometimes, the data is not posted to web-service because of the internet is down, and the task is retried infinite times until it is posted. The retrying of the task is un-necessary because the net was down and hence its not required to re-try it again.

I thought of a better solution, ie if a task fails thrice (retrying a min of 3 times), then it is shifted to another queue. This queue contains list of all failed tasks. Now when the internet is up and the data is posted over the net , ie the task has been completed from the normal queue, it then starts processing the tasks from the queue having failed tasks. This will not waste the CPU memory of retrying the task again and again.

Here's my code :- As of right now, I'm just retrying the task again, But I doubt whether that'll be the right way of doing it.

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):

    try : 
        client = SoapClient(
                            location = url,
                            action = 'http://tempuri.org/IService_1_0/',
                            namespace = "http://tempuri.org/", 
                            soap_ns='soap', ns = False
                            )

        response= client.UpdateShipment(
                                        Weight = Decimal(data['Weight']), 
                                        Length = Decimal(data['Length']), 
                                        Height = Decimal(data['Height']), 
                                        Width =  Decimal(data['Width']) , 
                                        )

    except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

How do I maintain 2 queues simultaneous and trying to execute tasks from both the queues.

Settings.py

BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

回答1:

By default celery adds all tasks to queue named celery. So you can run your task here and when an exception occurs, it retries, once it reaches maximum retries, you can shift them to a new queue say foo

from celery.exceptions import MaxRetriesExceededError

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
    try:
        #do something with given args

 except MaxRetriesExceededError:
        post_data_to_web_service([data, url], queue='foo')

 except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

When you start your worker, this task will try to do something with given data. If it fails it will retry 10 times with a dealy of 60 seconds. Then when it encounters MaxRetriesExceededError it posts the same task to new queue foo.

To consume these tasks you have to start a new worker

celery worker -l info -A my_app -Q foo

or you can also consume this task from the default worker if you start it with

 celery worker -l info -A my_app -Q celery,foo