芹菜创建每个任务的新连接(Celery creating a new connection for

2019-07-31 01:50发布

我使用的是芹菜与Redis的运行一些后台任务,但每一个任务被调用时,它创建一个到Redis的一个新的连接。 我在Heroku和我的Redis到去计划允许10个连接。 我赶紧打这个上限后得到一个“客户端的最大数量达到了”错误。

我怎样才能确保芹菜队列在单个连接上的任务,而不是打开每个时间一个新的?

编辑 -包括全面回溯

File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
   response = callback(request, *callback_args, **callback_kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view
   return view_func(*args, **kwargs)

 File "/app/feedback/views.py", line 264, in zencoder_webhook_handler
   tasks.process_zencoder_notification.delay(webhook)

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay
   return self.apply_async(args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async
   with app.producer_or_acquire(producer) as P:

 File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
   return self.gen.next()

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire
   with self.amqp.producer_pool.acquire(block=True) as producer:

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire
   R = self.prepare(R)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
   p = p()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
   return lambda: self.create_producer()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer
   return self.Producer(self._acquire_connection())

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__
   super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__
   self.revive(self.channel)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive
   channel = self.channel = maybe_channel(channel)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel
   return channel.default_channel

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel
   self.connection

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
   self._connection = self._establish_connection()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
   conn = self.transport.establish_connection()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection
   self._avail_channels.append(self.create_channel(self))

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel
   channel = self.Channel(connection)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__
   self.client.info()

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info
   return self.execute_command('INFO')

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command
   conn.send_command(*args)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command
   self.send_packed_command(self.pack_command(*args))

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command
   self.connect()

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect
   self.on_connect()

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect
   if self.read_response() != 'OK':

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response
   raise response

ResponseError: max number of clients reached

Answer 1:

我希望我使用Redis的,因为有限制的连接数量的特定选项: CELERY_REDIS_MAX_CONNECTIONS

  • http://docs.celeryproject.org/en/3.0/configuration.html#celery-redis-max-connections (3.0)
  • http://docs.celeryproject.org/en/latest/configuration.html#celery-redis-max-connections (3.1)
  • http://docs.celeryproject.org/en/master/configuration.html#celery-redis-max-connections (对于DEV)

MongoDB也有类似的后端设置。

鉴于这些后端设置,我不知道是什么BROKER_POOL_LIMIT实际上做。 希望CELERY_REDIS_MAX_CONNECTIONS解决您的问题。

我使用CloudAMQP那些人之一,和AMQP后端没有自己的连接限制参数。



Answer 2:

我跑进在Heroku与CloudAMQP同样的问题。 我不知道为什么,但对分配低整数,当我有没有运气BROKER_POOL_LIMIT设置。

最终,我发现,通过设置BROKER_POOL_LIMIT=NoneBROKER_POOL_LIMIT=0我的问题被减轻。 据芹菜文档,这将禁用连接池。 到目前为止,这一直不是我一个明显的问题,但我不知道这是否可能是你。

链接到相关信息: http://celery.readthedocs.org/en/latest/configuration.html#broker-pool-limit



Answer 3:

尝试这些设置:

CELERY_IGNORE_RESULT = True
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True


Answer 4:

我不得不涉及到连接和芹菜的数量类似的问题。 这不是在Heroku,这是蒙戈,而不是虽然Redis的。

我开始在任务模块级的任务函数定义之外的连接。 至少在这蒙哥允许的任务来共享连接。

希望帮助。

https://github.com/instituteofdesign/wander/blob/master/wander/tasks.py

mongoengine.connect('stored_messages')

@celery.task(default_retry_delay = 61)
def pull(settings, google_settings, user, folder, messageid):
    '''
    Pulls a message from zimbra and stores it in Mongo
    '''

    try:
        imap = imap_connect(settings, user)
        imap.select(folder, True)
    .......


文章来源: Celery creating a new connection for each task