可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
Is there a way to determine, programatically, that the current module being imported/run is done so in the context of a celery worker?
We've settled on setting an environment variable before running the Celery worker, and checking this environment variable in the code, but I wonder if there's a better way?
回答1:
Simple,
import sys
IN_CELERY_WOKER_PROCESS = False
if len(sys.argv) > 0 and sys.argv[0].endswith('celery')\
and 'worker' in sys.argv:
IN_CELERY_WOKER_PROCESS=True
print ('Im in Celery worker')
http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/
回答2:
Depending on what your use-case scenario is exactly, you may be able to detect it by checking whether the request id is set:
@app.task(bind=True)
def foo(self):
print self.request.id
If you invoke the above as foo.delay()
then the task will be sent to a worker and self.request.id
will be set to a unique number. If you invoke it as foo()
, then it will be executed in your current process and self.request.id
will be None
.
回答3:
Adding a environment variable is a good way to check if the module is being run by celery worker. In the task submitter process we may set the environment variable, to mark that it is not running in the context of a celery worker.
But the better way may be to use some celery signals which may help to know if the module is running in worker or task submitter. For example, worker-process-init signal is sent to each child task executor process (in preforked mode) and the handler can be used to set some global variable indicating it is a worker process.
回答4:
It is a good practice to start workers with names, so that it becomes easier to manage(stop/kill/restart) them. You can use -n
to name a worker.
celery worker -l info -A test -n foo
Now, in your script you can use app.control.inspect
to see if that worker is running.
In [22]: import test
In [23]: i = test.app.control.inspect(['foo'])
In [24]: i.app.control.ping()
Out[24]: [{'celery@foo': {'ok': 'pong'}}]
You can read more about this in celery worker docs
回答5:
You can use the current_worker_task
property from the Celery
application instance class. Docs here.
With the following task defined:
# whatever_app/tasks.py
celery_app = Celery(app)
@celery_app.task
def test_task():
if celery_app.current_worker_task:
return 'running in a celery worker'
return 'just running'
You can run the following on a python shell:
In [1]: from whatever_app.tasks import test_task
In [2]: test_task()
Out[2]: 'just running'
In [3]: r = test_task.delay()
In [4]: r.result
Out[4]: u'running in a celery worker'
Note: Obviously for test_task.delay()
to succeed, you need to have at least one celery worker running and configured to load tasks from whatever_app.tasks
.
回答6:
As of celery 4.2 you can also do this by setting a flag on the worker_ready
signal
in celery.py
:
from celery.signals import worker_ready
app = Celery(...)
app.running = False
@worker_ready.connect
def set_running(*args, **kwargs):
app.running = True
Now you can check within your task by using the global app instance
to see whether or not you are running. This can be very useful to determine which logger to use.