Celery &Rabbitmq:WARNING/MainProcess] Received and

2019-02-13 14:26发布

Recently , I am doing an experiment on a GIT project to understanding the big data processing framework.

1、GIT project:https://github.com/esperdyne/celery-message-processing

we have the following components:

1、AMPQ broker(RabbitMQ): it works as a message buffer, which works as a mail-box to exchange messages for different user!

2、worker: it works as the service-server to provide service for various service client. 3、Queue("celery":it works as a multi-processing container which is used to handle the various worker instances at the same time.

the key configuration can be seen as bellow:

We use the object proj/celery.py to define the app, the definition can be seen as below:

app = Celery('proj',
         broker='amqp://',
         backend='redis://localhost',
         include=['proj.tasks'])

enter code here

when we start the app:

1、 when we start the application, we have seen the message which is produced from the rabbitmq, yet the celery could not handle the message.

Parse.log looks like this:[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

we have the following question:

4.2.1 AMQP mechanism enter image description here We can see that the AMQP works as the message buffer, then there will be a message sender and a message fetcher:

In the above diagram , who is the message sender and who is the message fetcher.

4.2.2 Message definition In our application , we can not find the code to define the Message to send ,or to receive form the AMQP.

4.2.3 Message monitor How can we monitor the Message send and receive in the AMQP. Hope a teacher will guide us to solve the problem , and give us some detailed

introduction on the celery broker mechenism!

note : the error log can be seen here

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?



 The full contents of the message body was: body: [[u'maildir/allen-       p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': 'gen17347@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete
[2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready.
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': 'gen19722@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}

enter code here

2条回答
Deceive 欺骗
2楼-- · 2019-02-13 15:03

Just so that the answer is located here as well. In the thread Anis refers to 23doors mentions that Celery 4's new default protocol does not play nice with librabbitmq:

Apparently librabbitmq issue is related to new default protocol in celery 4.x.

He also mentions that to resolve this issue you can make use of the older protocol Celery offers by setting (if you're using Django):

CELERY_TASK_PROTOCOL = 1 

Otherwise you can set the following in your celeryconf.py file

app.conf.task_protocol = 1

All credit to 23doors :)

查看更多
我想做一个坏孩纸
3楼-- · 2019-02-13 15:12

It would be helpful to give the versions of celery and librabbitmq you are using. Since I had a very similar problem, I'll guess that you are using celery 4.0.2 and librabbitmq 1.6.1.

In such case, this is a known compatibility issue, you can refer to https://github.com/celery/celery/issues/3675 and https://github.com/celery/librabbitmq/issues/93.

The first link gives you recommendation to solve your problem namely:

  • uninstall librabbitmq pip uninstall librabbitmq (you may have to call this command many times)

  • change the occurrences of amqp to pyamqp in your borker urls. (Though not in your config file if your are using one. Doing that did not work for me).

To answer more precisely your other questions: you are right saying that there is a sender and a fetcher.

The sender role is assumed by the app created when you call Celery(...). One of its role is to act as a registry of tasks, and if you look at its implementation in app/base.py, you'll see that it implements a method send_task which is directly called by the method apply_async of the Task class. This method's role is to send a marshalled version of your task through the wire up to the broker so it can be fetched by a worker. The application protocol used to transmit the message is amqp, for which an implementation is librabbitmq.

On the other side of the wire, there is another instance, launched by the worker which does the fetching work. In celery's parlance, it is called a Consumer. You can find its implementation in worker/consumer/consumer.py. You will see that it implements a create_task_handler which in turns defines the on_task_received functions that raises the error you are seeing. It is the function called when a new task is fetched from the worker and next in line to by processed.

The solution suggested therefore consists in changing the implementation of the amqp protocol so that a TypeError is not raised in on_task_received (which it seems to me would be caused by an encoding issue).

I hope it answers all your questions and gives you a clearer view of how celery works. I should end by saying that to my knowledge a "conventional" use of Celery would never require you to tamper with those kind of internals, and that you can achieve 99% of what you may want by implementing custom task classes and custom backends for example.

查看更多
登录 后发表回答