I'm posting a data to a web-service in Celery. Sometimes, the data is not posted to web-service because of the internet is down, and the task is retried infinite times until it is posted. The retrying of the task is un-necessary because the net was down and hence its not required to re-try it again.
I thought of a better solution, ie if a task fails thrice (retrying a min of 3 times), then it is shifted to another queue. This queue contains list of all failed tasks. Now when the internet is up and the data is posted over the net , ie the task has been completed from the normal queue, it then starts processing the tasks from the queue having failed tasks. This will not waste the CPU memory of retrying the task again and again.
Here's my code :- As of right now, I'm just retrying the task again, But I doubt whether that'll be the right way of doing it.
@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
try :
client = SoapClient(
location = url,
action = 'http://tempuri.org/IService_1_0/',
namespace = "http://tempuri.org/",
soap_ns='soap', ns = False
)
response= client.UpdateShipment(
Weight = Decimal(data['Weight']),
Length = Decimal(data['Length']),
Height = Decimal(data['Height']),
Width = Decimal(data['Width']) ,
)
except Exception, exc:
raise post_data_to_web_service.retry(exc=exc)
How do I maintain 2 queues simultaneous and trying to execute tasks from both the queues.
Settings.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
By default celery adds all tasks to queue named
celery
. So you can run your task here and when an exception occurs, it retries, once it reaches maximum retries, you can shift them to a new queue sayfoo
When you start your worker, this task will try to do something with given data. If it fails it will retry 10 times with a dealy of 60 seconds. Then when it encounters
MaxRetriesExceededError
it posts the same task to new queuefoo
.To consume these tasks you have to start a new worker
or you can also consume this task from the default worker if you start it with