Celery worker variable sharing issues

2019-04-14 16:24发布

I am using Python and celery in a project. In the project, I have two files:

celeryconfig.py

BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2

and example.py

from celery.task import task
import hashlib

md5 = hashlib.md5()

@task
def getDigest(text):
    print 'Using md5 - ',md5
    md5.update(text)
    return md5.digest()

In celeryconfig.py, I set the CELERYD_CONCURRENCY to 2, which means that it will distribute the tasks in my task queue to 2 different processes.

From a Python console, I run:

from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')

This creates two tasks that are simultaneously executed by the two workers. The problem is, as both of the worker processes run their task functions [getDigest()], they seem to be using the same hash object (md5). The output of celeryd confirms this as you can see below.

[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870>

For the sake of simplicity, I am using the md5 object of hashlib, but in my actual project, I am using an object that cannot be accessed and modified by more than one process. This expectedly makes the workers crash.

That brings up the question: How can I modify my code to make the worker processes initialize and use their very own (md5) object? Right now, they are sharing the same object - causing my application to crash. Is this possible?

1条回答
Juvenile、少年°
2楼-- · 2019-04-14 16:49

They're using the same object because you're explicitly telling them to in your code. By creating the object outside the scope of the task and using it within the task, you are giving all workers access to the shared object. This is a concurrency issue, not necessarily a Celery issue. You could use a copy of the object if it's small, or use your own locking strategy. In general, though, if an object is going to be updated by more than one process at a time, it needs to employ some sort of synchronization, which is outside of the scope of Celery.

查看更多
登录 后发表回答