I'm trying to shutdown the main celery process by raisin SystemExit() in the task_postrun signal. The signal gets fired just fine, and the exception gets raised, but the worker never completely exits and just hangs there.
HOW DO I MAKE THIS WORK?
Am I forgetting some setting somewhere?
Below is the code that I'm using for the worker (worker.py):
from celery import Celery
from celery import signals
app = Celery('tasks',
set_as_current = True,
broker='amqp://guest@localhost//',
backend="mongodb://localhost//",
)
app.config_from_object({
"CELERYD_MAX_TASKS_PER_CHILD": 1,
"CELERYD_POOL": "solo",
"CELERY_SEND_EVENTS": True,
"CELERYD_CONCURRENCY": 1,
"CELERYD_PREFETCH_MULTIPLIER": 1,
})
def shutdown_worker(**kwargs):
print("SHUTTING DOWN WORKER (RAISING SystemExit)")
raise SystemExit()
import tasks
signals.task_postrun.connect(shutdown_worker)
print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")
and below is the code for tasks.py:
from celery import Celery,task
from celery.signals import task_postrun
import time
from celery.task import Task
class Blah(Task):
track_started = True
acks_late = False
def run(config, kwargs):
time.sleep(5)
return "SUCCESS FROM BLAH"
def get_app():
celery = Celery('tasks',
broker='amqp://guest@localhost//',
backend="mongodb://localhost//"
)
return celery
To test this, first thing I do is run the worker code (python worker.py
), then I queue up a task like so:
>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))
The output I see from the worker is this:
STARTING WORKER
-------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events: ON
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)
I was expecting the python code to return from the call to app.worker_main()
and then to print WORKER EXITED
and then for the process to exit completely. This never happens, and the worker process has to be kill -KILL {PID}
ed for it to go away (it's not able to consume any other tasks either.
I guess my main question would be:
HOW DO I MAKE THE CODE RETURN FROM app.worker_main()
?
I'd like to be able to completely restart the worker process (by having the process COMPLETELY exit) after X
number of tasks have been executed.
UPDATE I figured out what the worker is hanging on - the worker (WorkController
) is hanging on a call to self.stop
after it catches the SystemExit
exception.
Hate it when I end up answering my own question.
Anywhoo, it was blocking on a join call on the
Mediator
component inside theWorkController
(callsstop()
on theMediator
component, inside stop, itjoin
s).I got rid of the
Mediator
component by disabling all rate limits (should be this by default, but it's not for some reason).You can disable all rate limits with the setting:
Hope this helps somebody else down the road too.
Peace