Celery & RabbitMQ running as docker containers: Re

2019-03-09 20:43发布

问题:

I am relatively new to docker, celery and rabbitMQ.

In our project we currently have the following setup: 1 physical host with multiple docker containers running:

1x rabbitmq:3-management container

# pull image from docker hub and install
docker pull rabbitmq:3-management
# run docker image
docker run -d -e RABBITMQ_NODENAME=my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

1x celery container

# pull docker image from docker hub
docker pull celery
# run celery container
docker run --link some-rabbit:rabbit --name some-celery -d celery

(there are some more containers, but they should not have to do anything with the problem)

Task File

To get to know celery and rabbitmq a bit, I created a tasks.py file on the physical host:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest:guest@172.17.0.81/')

@app.task(name='tasks.add')
def add(x, y):
    return x + y

The whole setup seems to be working quite fine actually. So when I open a python shell in the directory where tasks.py is located and run

>>> from tasks import add
>>> add.delay(4,4)

The task gets queued and directly pulled from the celery worker.

However, the celery worker does not know the tasks module regarding to the logs:

$ docker logs some-celery


[2015-04-08 11:25:24,669: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'callbacks': None, 'timelimit': (None, None), 'retries': 0, 'id': '2b5dc209-3c41-4a8d-8efe-ed450d537e56', 'args': (4, 4), 'eta': None, 'utc': True, 'taskset': None, 'task': 'tasks.add', 'errbacks': None, 'kwargs': {}, 'chord': None, 'expires': None} (256b)
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/site-packages/celery/worker/consumer.py", line 455, in on_task_received
strategies[name](message, body,
KeyError: 'tasks.add'

So the problem obviously seems to be, that the celery workers in the celery container do not know the tasks module. Now as I am not a docker specialist, I wanted to ask how I would best import the tasks module into the celery container?

Any help is appreciated :)


EDIT 4/8/2015, 21:05:

Thanks to Isowen for the answer. Just for completeness here is what I did:

Let's assume my tasks.py is located on my local machine in /home/platzhersh/celerystuff. Now I created a celeryconfig.py in the same directory with the following content:

CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
CELERY_RESULT_BACKEND = 'amqp'

As mentioned by Isowen, celery searches /home/user of the container for tasks and config files. So we mount the /home/platzhersh/celerystuff into the container when starting:

run -v /home/platzhersh/celerystuff:/home/user --link some-rabbit:rabbit --name some-celery -d celery

This did the trick for me. Hope this helps some other people with similar problems. I'll now try to expand that solution by putting the tasks also in a separate docker container.

回答1:

As you suspect, the issue is because the celery worker does not know the tasks module. There are two things you need to do:

  1. Get your tasks definitions "into" the docker container.
  2. Configure the celery worker to load those task definitions.

For Item (1), the easiest way is probably to use a "Docker Volume" to mount a host directory of your code onto the celery docker instance. Something like:

docker run --link some-rabbit:rabbit -v /path/to/host/code:/home/user --name some-celery -d celery 

Where /path/to/host/code is the your host path, and /home/user is the path to mount it on the instance. Why /home/user in this case? Because the Dockerfile for the celery image defines the working directory (WORKDIR) as /home/user.

(Note: Another way to accomplish Item (1) would be to build a custom docker image with the code "built in", but I will leave that as an exercise for the reader.)

For Item (2), you need to create a celery configuration file that imports the tasks file. This is a more general issue, so I will point to a previous stackoverflow answer: Celery Received unregistered task of type (run example)