Jobs not executing via Airflow that runs celery wi

2019-02-15 05:14发布

Below is the config im using

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags

# The folder where airflow should store its log files. This location
base_log_folder = /root/airflow/logs

# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/centos/airflow/airflow.db
sql_alchemy_conn = mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above


# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = pyamqp://guest:guest@XXX.XXX.XXX.XXX:5672/


# Another key Celery setting
celery_result_backend = db+mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5556

# Default queue that tasks get assigned to and that worker listen on.
default_queue = = default

But jobs dont run.. the scheduler shows that it is checking for the state as below

[2017-05-11 05:09:13,070] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-13 00:00:00: scheduled__2015-06-13T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,072] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-14 00:00:00: scheduled__2015-06-14T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,074] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-15 00:00:00: scheduled__2015-06-15T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,076] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-16 00:00:00: scheduled__2015-06-16T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,078] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 04:46:29: manual__2017-05-10T04:46:28.756946, externally triggered: True>
[2017-05-11 05:09:13,080] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 05:08:20: manual__2017-05-10T05:08:20.252573, externally triggered: True>

Airflow UI is up and running. Cerlery flower doesnt show any workers. My jobs are not running.

Below is the sequence i follow to get started.

Airflow scheduler

airflow webserver -p 8080

Airflow worker

Is there anything i am missing?

1条回答
神经病院院长
2楼-- · 2019-02-15 05:40

Without knowing what version of Airflow you are running and how you have configured your rabbitmq-server, it is somewhat difficult to answer your question with surety. However, I can offer some things for you to look into.

Here is the Celery documentation for specifying a broker URL. The broker URL in your airflow.cfg does not specify a virtual host, so per the documentation the default virtual host will be used. I did some digging but could not find what the default virtual host for pyampq is, but this is worth looking in to.

Alternatively, you could explicitly configure a virtual host using rabbitmqctl. Here is a post where someone lays out how to do this with Airflow. I have copy and pasted the relevant information below:

# Configure RabbitMQ: create user and grant privileges
rabbitmqctl add_user rabbitmq_user_name rabbitmq_password
rabbitmqctl add_vhost rabbitmq_virtual_host_name
rabbitmqctl set_user_tags rabbitmq_user_name rabbitmq_tag_name
rabbitmqctl set_permissions -p rabbitmq_virtual_host_name rabbitmq_user_name ".*" ".*" ".*"

Finally, you may be running into an issue with the Celery version you are using. At time of posting, Celery 4.X.X does not play nicely with Airflow. Try uninstalling Celery and reinstalling a version that works.

pip uninstall celery
pip install celery==3.1.7
查看更多
登录 后发表回答