How can I detect whether I'm running in a Cele

2019-02-25 10:51发布

问题:

Is there a way to determine, programatically, that the current module being imported/run is done so in the context of a celery worker?

We've settled on setting an environment variable before running the Celery worker, and checking this environment variable in the code, but I wonder if there's a better way?

回答1:

Simple,

import sys

IN_CELERY_WOKER_PROCESS = False

if len(sys.argv) > 0 and sys.argv[0].endswith('celery')\
        and 'worker' in sys.argv:
    IN_CELERY_WOKER_PROCESS=True
    print ('Im in Celery worker')

http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/



回答2:

Depending on what your use-case scenario is exactly, you may be able to detect it by checking whether the request id is set:

@app.task(bind=True)
def foo(self):
    print self.request.id

If you invoke the above as foo.delay() then the task will be sent to a worker and self.request.id will be set to a unique number. If you invoke it as foo(), then it will be executed in your current process and self.request.id will be None.



回答3:

Adding a environment variable is a good way to check if the module is being run by celery worker. In the task submitter process we may set the environment variable, to mark that it is not running in the context of a celery worker.

But the better way may be to use some celery signals which may help to know if the module is running in worker or task submitter. For example, worker-process-init signal is sent to each child task executor process (in preforked mode) and the handler can be used to set some global variable indicating it is a worker process.



回答4:

It is a good practice to start workers with names, so that it becomes easier to manage(stop/kill/restart) them. You can use -n to name a worker.

celery worker -l info -A test -n foo

Now, in your script you can use app.control.inspect to see if that worker is running.

In [22]: import test

In [23]: i = test.app.control.inspect(['foo'])

In [24]: i.app.control.ping()
Out[24]: [{'celery@foo': {'ok': 'pong'}}]

You can read more about this in celery worker docs



回答5:

You can use the current_worker_task property from the Celery application instance class. Docs here.

With the following task defined:

 # whatever_app/tasks.py

 celery_app = Celery(app)

 @celery_app.task
 def test_task():
     if celery_app.current_worker_task:
         return 'running in a celery worker'
     return 'just running'

You can run the following on a python shell:

In [1]: from whatever_app.tasks import test_task

In [2]: test_task()
Out[2]: 'just running'

In [3]: r = test_task.delay()

In [4]: r.result
Out[4]: u'running in a celery worker' 

Note: Obviously for test_task.delay() to succeed, you need to have at least one celery worker running and configured to load tasks from whatever_app.tasks.



回答6:

As of celery 4.2 you can also do this by setting a flag on the worker_ready signal

in celery.py:


from celery.signals import worker_ready
app = Celery(...)
app.running = False

@worker_ready.connect
def set_running(*args, **kwargs):
    app.running = True


Now you can check within your task by using the global app instance to see whether or not you are running. This can be very useful to determine which logger to use.



标签: python celery