I am new to Celery
and SQS
, and would like to use it to periodically check messages stored in SQS
and then fire a consumer. The consumer and Celery
both live on EC2
, while the messages are sent from GAE
using boto
library.
Currently, I am confused about:
- In the message body of
creating_msg_gae.py
, whattask
information I should put here? I assume this information would be the name of mycelery task
? - In the message body of
creating_msg_gae.py
, isurl
considered as the argument to be processed by my consumer (function do_something_url(url)
intasks.py
)? - Currently, I am running celery with command
celery worker -A celery_c -l info
, from the command line, it seems like celery checksSQS
periodically. Do I need to create aPeriodicTask
inCelery
instead?
I really appreciate any suggestions to help me with this issue.
creating_msg_gae.py
from boto import sqs
conn = sqs.connect_to_region("us-east-1",
aws_access_key_id='aaa',
aws_secret_access_key='bbb')
my_queue = conn.get_queue('uber_batch')
msg = {'properties': {'content_type': 'application/json',
'content_encoding': 'utf-8',
'body_encoding':'base64',
'delivery_tag':None,
'delivery_info': {'exchange':None, 'routing_key':None}},}
body = {'id':'theid',
###########Question 1#######
'task':'what task name I should put here?',
'url':['my_s3_address']}
msg.update({'body':base64.encodestring(json.dumps(body))})
my_queue.write(my_queue.new_message(json.dumps(msg)))
My Celery file system looks like:
./ce_folder/
celery_c.py, celeryconfig.py, tasks.py, __init__.py
celeryconfig.py
import os
BROKER_BACKEND = "SQS"
AWS_ACCESS_KEY_ID = 'aaa'
AWS_SECRET_ACCESS_KEY = 'bbb'
os.environ.setdefault("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID)
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY)
BROKER_URL = 'sqs://'
BROKER_TRANSPORT_OPTIONS = {'region': 'us-east-1'}
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 60}
BROKER_TRANSPORT_OPTIONS = {'polling_interval': 30}
CELERY_DEFAULT_QUEUE = 'uber_batch'
CELERY_DEFAULT_EXCHANGE = CELERY_DEFAULT_QUEUE
CELERY_DEFAULT_EXCHANGE_TYPE = CELERY_DEFAULT_QUEUE
CELERY_DEFAULT_ROUTING_KEY = CELERY_DEFAULT_QUEUE
CELERY_QUEUES = {
CELERY_DEFAULT_QUEUE: {
'exchange': CELERY_DEFAULT_QUEUE,
'binding_key': CELERY_DEFAULT_QUEUE,
}
}
celery_c.py
from __future__ import absolute_import
from celery import Celery
app = Celery('uber')
app.config_from_object('celeryconfig')
if __name__ == '__main__':
app.start()
tasks.py
from __future__ import absolute_import
from celery_c import app
@app.task
def do_something_url(url):
..download file from url
..do some calculations
..upload results files to s3 and return the result url###
return result_url