Celery Consumer SQS Messages

2020-07-29 17:23发布

问题:

I am new to Celery and SQS, and would like to use it to periodically check messages stored in SQS and then fire a consumer. The consumer and Celery both live on EC2, while the messages are sent from GAE using boto library.

Currently, I am confused about:

  1. In the message body of creating_msg_gae.py, what task information I should put here? I assume this information would be the name of my celery task?
  2. In the message body of creating_msg_gae.py, is url considered as the argument to be processed by my consumer (function do_something_url(url) in tasks.py)?
  3. Currently, I am running celery with command celery worker -A celery_c -l info, from the command line, it seems like celery checks SQS periodically. Do I need to create a PeriodicTask in Celery instead?

I really appreciate any suggestions to help me with this issue.

creating_msg_gae.py

from boto import sqs
conn = sqs.connect_to_region("us-east-1",
                             aws_access_key_id='aaa',
                             aws_secret_access_key='bbb')
my_queue = conn.get_queue('uber_batch')
msg = {'properties': {'content_type': 'application/json', 
                              'content_encoding': 'utf-8', 
                              'body_encoding':'base64', 
                              'delivery_tag':None, 
                              'delivery_info': {'exchange':None, 'routing_key':None}},}
body = {'id':'theid',
        ###########Question 1#######
        'task':'what task name I should put here?',
        'url':['my_s3_address']}
msg.update({'body':base64.encodestring(json.dumps(body))})
my_queue.write(my_queue.new_message(json.dumps(msg)))

My Celery file system looks like:

./ce_folder/
            celery_c.py, celeryconfig.py, tasks.py, __init__.py

celeryconfig.py

import os
BROKER_BACKEND = "SQS"
AWS_ACCESS_KEY_ID = 'aaa'
AWS_SECRET_ACCESS_KEY = 'bbb'
os.environ.setdefault("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID)
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY)
BROKER_URL = 'sqs://'

BROKER_TRANSPORT_OPTIONS = {'region': 'us-east-1'}
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 60} 
BROKER_TRANSPORT_OPTIONS = {'polling_interval': 30}

CELERY_DEFAULT_QUEUE = 'uber_batch'
CELERY_DEFAULT_EXCHANGE = CELERY_DEFAULT_QUEUE
CELERY_DEFAULT_EXCHANGE_TYPE = CELERY_DEFAULT_QUEUE
CELERY_DEFAULT_ROUTING_KEY = CELERY_DEFAULT_QUEUE
CELERY_QUEUES = {
    CELERY_DEFAULT_QUEUE: {
        'exchange': CELERY_DEFAULT_QUEUE,
        'binding_key': CELERY_DEFAULT_QUEUE,
    }
}         

celery_c.py

from __future__ import absolute_import
from celery import Celery

app = Celery('uber')
app.config_from_object('celeryconfig')

if __name__ == '__main__':
    app.start()

tasks.py

from __future__ import absolute_import
from celery_c import app

@app.task
def do_something_url(url):
    ..download file from url
    ..do some calculations 
    ..upload results files to s3 and return the result url###
    return result_url