Airflow 1.10.0 via Ansible

2019-06-08 01:49发布

问题:

Below is my Ansible code which is trying to install Airflow 1.10.0.

sudo journalctl -u airflow-webserver -e output is

Dec 31 12:13:48 ip-10-136-94-232.eu-central-1.compute.internal airflow[22224]: ProgrammingError: (_mysql_exceptions.ProgrammingError) (1146, "Table 'airflow.log' doesn't exist") [SQL: u'INSERT INTO log (dttm, dag_id,

sudo journalctl -u airflow-scheduler -e output is

Dec 31 12:14:19 ip-10-136-94-232.eu-central-1.compute.internal airflow[22307]: ProgrammingError: (_mysql_exceptions.ProgrammingError) (1146, "Table 'airflow.log' doesn't exist") [SQL: u'INSERT INTO log (dttm, dag_id,

install.yml

---
- name: Airflow | Install | Basic Packages
  yum:
     name: "{{ packages }}"
  vars:
    packages:
    - gcc
    - gcc-c++
    - zlib-devel
    - bzip2-devel
    - openssl-devel
    - ncurses-devel
    - sqlite-devel
    - cyrus-sasl-devel
    - postgresql
    - postgresql-server
    - mariadb-server
    - mariadb
    - python2-pip
    - python2-devel
    - mysql-devel
    - python-setuptools
    - java-1.8.0-openjdk.x86_64
    - MySQL-python
    - mysql-connector-python
  register: airflow_dbsetup
  notify:
      - restart postgresql
      - restart rabbitmq-server
      - restart mariadb

- name: Airflow | Install | Upgrade pip
  shell: "pip install --upgrade pip"

- name: Airflow | Install | Upgrade setuptools
  shell: "pip install --upgrade setuptools"

- name: Airflow | Inatall | Start mariadb
  systemd: state=started name=mariadb daemon_reload=yes
  sudo: yes

- name: Airflow | Install | Group dev
  yum:
      name: "@Development"
      state: latest

- name: Airflow | Install | Numpy
  pip:
     name: numpy
     version: latest
  sudo: yes

- name: Airflow | Install | cython
  pip:
     name: cython
     version: latest
  sudo: yes

- name: Airflow | Install | With pip
  pip:
     name: apache-airflow
     version: 1.10.0

- name: Airflow | Install | crypto
  pip:
     name: apache-airflow[crypto]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | hive
  pip:
     name: apache-airflow[hive]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Inatall | MySQL
  pip:
     name: apache-airflow[mysql]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | jdbc
  pip:
     name: apache-airflow[jdbc]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | password
  pip:
     name: apache-airflow[password]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | s3
  pip:
     name: apache-airflow[s3]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | slack
  pip:
     name: apache-airflow[slack]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | ssh
  pip:
     name: apache-airflow[ssh]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | Reinstall pip
  shell: "pip install --upgrade --force-reinstall pip==9.0.0"

- name: Airflow | Install | devel
  pip:
     name: apache-airflow[devel]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Inatall | MSSql
  pip:
     name: apache-airflow[mssql]
     version: 1.10.0
  register: airflow_install

- name: Airflow | Install | Celery
  pip:
     name: celery

- name: Airflow | Install | psycopg2
  pip:
     name: psycopg2

- name: Airflow | Inatall | psycopg2-binary
  pip:
     name: psycopg2-binary

- name: Airflow | Install | erlang
  yum:
      name: https://github.com/rabbitmq/erlang-rpm/releases/download/v20.1.7/erlang-20.1.7-1.el6.x86_64.rpm
      state: present

- name: Airflow | Install | socat
  yum:
     name: socat
     state: present

- name: Airflow | Install | Rabbitmq
  yum:
      name: https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.8/rabbitmq-server-3.7.8-1.el7.noarch.rpm
      state: present

database.yml

---
- name: Airflow | DB | Uninstall markupsafe
  pip:
    name: markupsafe
    state: absent

- name: Airflow | DB | Install markupsafe
  pip:
    name: markupsafe
    version: latest

- name: Airflow | DB | Set PostgreSQL environment variables
  template:
    src: postgres.sh.j2
    dest: /etc/profile.d/postgres.sh
    mode: 0644
  notify: restart postgresql

- name: Airflow | DB | Ensure PostgreSQL data directory exists
  file:
    path: "{{ postgresql_data_dir }}"
    owner: "{{ postgresql_user }}"
    group: "{{ postgresql_group }}"
    state: directory
    mode: 0700
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart postgresql

- name: Airflow | DB | Check if PostgreSQL database is initialized
  stat:
    path: "{{ postgresql_data_dir }}/PG_VERSION"
  register: file_exists

- name: Airflow | DB | Initialize PostgreSQL Database
  command: "{{ airflow_executable_pgsql }} initdb"
  when: not file_exists.stat.exists
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart postgresql

- name: Airflow | DB | Copy Postgresql hba file
  copy:
    src: ../templates/pg_hba.conf.j2
    dest: "{{ postgresql_data_dir }}/pg_hba.conf"
    owner: "{{ postgresql_user }}"
    group: "{{ postgresql_group }}"
    mode: 0600
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart postgresql

- name: Airflow | DB | Copy Postgresql config file
  copy:
    src: ../templates/postgresql.conf.j2
    dest: "{{ postgresql_data_dir }}/postgresql.conf.j2"
    owner: "{{ postgresql_user }}"
    group: "{{ postgresql_group }}"
    mode: 0600
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart postgresql

- name: Airflow | DB | Restart PostgreSQL
  shell: "systemctl restart postgresql"
  become: yes
  become_method: sudo
  become_user: root

- name: Airflow | DB | Postgresql Create DB
  postgresql_db:
    name: airflow

- name: Airflow | DB | Postgresql User
  postgresql_user:
    db: airflow
    name: airflow
    password: airflow
    priv: "ALL"
    expires: infinity
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart postgresql

- name: Airflow | DB | Postgresql Privileges
  postgresql_privs:
    db: airflow
    objs: ALL_DEFAULT
    privs: ALL
    type: default_privs
    role: airflow
    grant_option: yes

- name: Airflow | DB | Restart RabbitMQ-Server
  shell: "systemctl restart rabbitmq-server"
  become: yes
  become_method: sudo
  become_user: root

- name: Airflow | DB | RabbitMQ Add v_host
  rabbitmq_vhost:
    name: af-host
    state: present

- name: Airflow | DB | RabbitMQ User
  rabbitmq_user:
    user: airflow
    password: airflow
    tags: airflow-user
    vhost: af-host
    configure_priv: .*
    read_priv: .*
    write_priv: .*
    state: present
    force: yes
  become: yes
  become_method: sudo
  become_user: root
  register: airflow_dbsetup
  notify:
    - restart rabbitmq-server

- name: Airflow | DB | Create MySQL DB
  mysql_db:
    name: airflow
    state: present

- name: Airflow | DB | MySQL user
  mysql_user:
    name: airflow
    password: airflow
    priv: '*.*:ALL,GRANT'
    state: present

#- name: CREATE USER
#  shell: "sudo -i -u postgres psql -c "CREATE USER airflow WITH PASSWORD 'airflow';""

#- name: CREATE DATABASE
#  shell: "sudo -i -u postgres psql -c "CREATE DATABASE airflow;""

#- name: GRANT PRIVILEGES ON DATABASE
#  shell: "sudo -i -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;""

#- name: GRANT PRIVILEGES ON TABLES
#  shell: "sudo -i -u postgres psql -c "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;""

config.yml


- name: Airflow | Config | Ensure airflow directories structure
  file:
    path: "{{ item }}"
    state: directory
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
  with_items:
    - "{{ airflow_logs_folder }}"
    - "{{ airflow_child_process_log_folder }}"
    - "{{ airflow_dags_folder }}"
    - "{{ airflow_plugins_folder }}"

- name: Airflow | Config | Copy gunicorn logrotate config
  template:
    src: gunicorn-logrotate.j2
    dest: /etc/logrotate.d/airflow
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0644
  become: yes
  become_method: sudo
  become_user: root

- name: Airflow | Config | Copy sample dag hello_world
  copy:
    src: "{{ airflow_home }}/cng-ansible/roles/airflow/files/cng-hello_world.py"
    dest: "{{ airflow_dags_folder }}/cng-hello_world.py"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0644
    remote_src: True

- name: Airflow | Config | Synchronization of DAGs
  synchronize:
    src: "{{ airflow_home }}/cng-ansible/roles/airflow/files/"
    dest: "{{ airflow_dags_folder }}"

- name: Airflow | Config | Install airflow environmet file
  template:
    src: airflow-environment-file.j2
    dest: "{{ airflow_environment_file_folder }}/airflow"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640

- name: Airflow | Config | Copy basic airflow config file
  template:
    src: airflow.cfg.j2
    dest: "{{ airflow_home }}/airflow/airflow.cfg"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  register: airflow_config
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Initialize Airflow Database
  shell: "{{ airflow_executable }} initdb"
  args:
    chdir: "{{ airflow_home }}"
    executable: /bin/bash
  become: yes
  become_method: sudo
  become_user: "{{ airflow_user }}"

- name: Airflow | Config | Install webserver systemd unit file
  template:
    src: airflow-webserver.service.j2
    dest: /usr/lib/systemd/system/airflow-webserver.service
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  register: airflow_config
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Install scheduler systemd unit file
  template:
    src: airflow-scheduler.service.j2
    dest: /usr/lib/systemd/system/airflow-scheduler.service
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  register: airflow_config
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Install worker systemd unit file
  template:
    src: airflow-worker.service.j2
    dest: /usr/lib/systemd/system/airflow-worker.service
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  register: airflow_config
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Copy extra airflow config files (provided by playbooks)
  copy:
    src: "{{ item }}"
    dest: "{{ airflow_home }}/{{ item | basename }}"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  with_fileglob:
    - "{{ airflow_extra_conf_path }}/*"
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Copy extra airflow config templates (provided by playbooks)
  template:
    src: "{{ item }}"
    dest: "{{ airflow_home }}/{{ item | basename }}"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  with_fileglob:
    - "{{ airflow_extra_conf_template_path }}/*"
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker

- name: Airflow | Config | Add variables from configuration file
  command: "{{ airflow_executable }} variables -s {{ item.key }} {{ item.value }}"
  environment:
    AIRFLOW_HOME: "{{ airflow_home }}"
  become: true
  become_user: "{{ airflow_user }}"
  with_items: "{{ airflow_admin_variables }}"
  tags:
    skip_ansible_lint

- name: Airflow | Config | Add connections from configuration file
  command: "{{ airflow_executable }} connections -a {% for key, value in item.iteritems() %}--{{ key }} '{{ value }}' {% endfor %}"
  environment:
    AIRFLOW_HOME: "{{ airflow_home }}"
  become: true
  become_user: "{{ airflow_user }}"
  with_items: "{{ airflow_admin_connections }}"
  tags:
    skip_ansible_lint

service.yml

---

- name: Airflow | Services |Configuring service
  systemd:
    name: "{{ item.key }}"
    state: "{{ item.value.state }}"
    enabled: "{{ item.value.enabled }}"
    daemon_reload: yes
  become: yes
  become_method: sudo
  become_user: root
  with_dict: "{{ airflow_services }}"
  when: "{{ item.value.enabled }}"
  changed_when: false

health.yml

---

- name: Airflow | Health | DB Bug fix
  shell: "mysql -u root -e 'alter table airflow.task_instance add column executor_config varchar(15) after task_id;'"

- name: Airflow | Health | Status
  wait_for:
    host: localhost
    port: "{{ item }}"
    state: started         # Port should be open
    delay: 15               # No wait before first check (sec)
    timeout: 3             # Stop checking after timeout (sec)
  ignore_errors: yes
  with_items:
    - 8080

Error Log while installing this in AWS RHEL server

TASK [../../roles/airflow : Airflow | Health | DB Bug fix] ********************************************************************************************************************
fatal: [127.0.0.1]: FAILED! => {"changed": true, "cmd": "mysql -u root -e 'alter table airflow.task_instance add column executor_config varchar(15) after task_id;'", "delta": "0:00:00.192266", "end": "2018-12-31 10:35:22.455342", "msg": "non-zero return code", "rc": 1, "start": "2018-12-31 10:35:22.263076", "stderr": "ERROR 1146 (42S02) at line 1: Table 'airflow.task_instance' doesn't exist", "stderr_lines": ["ERROR 1146 (42S02) at line 1: Table 'airflow.task_instance' doesn't exist"], "stdout": "", "stdout_lines": []}

I was following below link to proceed with the installation to upgrade from 1.8 to 1.10.0 :-

https://airflow.apache.org/installation.html

Error after suggestions:-

    TASK [../../roles/airflow : Airflow | Config | Initialize Airflow Database] ***********************************************************************************************************************
fatal: [127.0.0.1]: FAILED! => {"changed": true, "cmd": "/usr/bin/airflow initdb", "delta": "0:00:00.202622", "end": "2018-12-31 16:15:59.082736", "msg": "non-zero return code", "rc": 1, "start": "2018-12-31 16:15:58.880114", "stderr": "Traceback (most recent call last):\n  File \"/usr/bin/airflow\", line 21, in <module>\n    from airflow import configuration\n  File \"/usr/lib/python2.7/site-packages/airflow/__init__.py\", line 35, in <module>\n    from airflow import configuration as conf\n  File \"/usr/lib/python2.7/site-packages/airflow/configuration.py\", line 506, in <module>\n    conf.read(AIRFLOW_CONFIG)\n  File \"/usr/lib/python2.7/site-packages/airflow/configuration.py\", line 280, in read\n    super(AirflowConfigParser, self).read(filenames)\n  File \"/usr/lib/python2.7/site-packages/backports/configparser/__init__.py\", line 705, in read\n    self._read(fp, filename)\n  File \"/usr/lib/python2.7/site-packages/backports/configparser/__init__.py\", line 1087, in _read\n    lineno)\nbackports.configparser.DuplicateSectionError: While reading from '/home/ec2-user/airflow/airflow.cfg' [line 60]: section u'core' already exists", "stderr_lines": ["Traceback (most recent call last):", "  File \"/usr/bin/airflow\", line 21, in <module>", "    from airflow import configuration", "  File \"/usr/lib/python2.7/site-packages/airflow/__init__.py\", line 35, in <module>", "    from airflow import configuration as conf", "  File \"/usr/lib/python2.7/site-packages/airflow/configuration.py\", line 506, in <module>", "    conf.read(AIRFLOW_CONFIG)", "  File \"/usr/lib/python2.7/site-packages/airflow/configuration.py\", line 280, in read", "    super(AirflowConfigParser, self).read(filenames)", "  File \"/usr/lib/python2.7/site-packages/backports/configparser/__init__.py\", line 705, in read", "    self._read(fp, filename)", "  File \"/usr/lib/python2.7/site-packages/backports/configparser/__init__.py\", line 1087, in _read", "    lineno)", "backports.configparser.DuplicateSectionError: While reading from '/home/ec2-user/airflow/airflow.cfg' [line 60]: section u'core' already exists"], "stdout": "", "stdout_lines": []}

New error log after implementing @kaxil suggestion:-

sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.ProgrammingError) (1146, "Table 'airflow.log' doesn't exist") [SQL: u'INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%s, %s, %s, %s, %s, %s, %s)'] [parameters: (datetime.datetime(2019, 1, 2, 10, 49, 11, 49590, tzinfo=<Timezone [UTC]>), None, None, 'cli_webserver', None, 'ec2-user', '{"full_command": "[\'/usr/bin/airflow\', \'webserver\']", "host_name": "ip-10-136-94-144.eu-central-1.compute.internal"}')]

回答1:

In your config.yml file, can you reorder the below 2 tasks (Airflow | Config | Initialize Airflow Database & Airflow | Config | Copy basic airflow config file) to

  1. Airflow | Config | Copy basic airflow config file
  2. Airflow | Config | Initialize Airflow Database

Basically, your airflow.cfg.j2 file should contain metadata database connection string like this sql_alchemy_conn = my_conn_string in [core] section as mentioned in https://airflow.apache.org/howto/set-config.html#setting-configuration-options (Double check it)

Once your config file is copied and initdb is run, it creates all the necessary tables needed by airflow.

- name: Airflow | Config | Initialize Airflow Database
  shell: "{{ airflow_executable }} initdb"
  args:
    chdir: "{{ airflow_home }}"
    executable: /bin/bash
  become: yes
  become_method: sudo
  become_user: "{{ airflow_user }}"

- name: Airflow | Config | Copy basic airflow config file
  template:
    src: airflow.cfg.j2
    dest: "{{ airflow_home }}/airflow/airflow.cfg"
    owner: "{{ airflow_user }}"
    group: "{{ airflow_group }}"
    mode: 0640
  register: airflow_config
  notify:
    - restart airflow-webserver
    - restart airflow-scheduler
    - restart airflow-worker