Airflow won't write logs to s3

2020-07-13 09:27发布

问题:

I tried different ways to configure Airflow 1.9 to write logs to s3 however it just ignores it. I found a lot of people having problems reading the Logs after doing so, however my problem is that the Logs remain local. I can read them without problem but they are not in the specified s3 bucket.

What I tried was first to write into the airflow.cfg file

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

Then I tried to set environment variables

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

However it gets ignored and the log files remain local.

I run airflow from a container, I adapted https://github.com/puckel/docker-airflow to my case but it won't write logs to s3. I use the aws connection to write to buckets in dags and this works but the Logs just remain local, no matter if I run it on an EC2 or locally on my machine.

回答1:

I finally found an answer using https://stackoverflow.com/a/48969421/3808066 which is most of the work I then had to ad one more step. I reproduce this answer here and adapt it a bit the way I did:

Some things to check:

  1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py.
  2. Make sure you didn't forget the __init__.py file in that dir.
  3. Make sure you defined the s3.task handler and set its formatter to airflow.task
  4. Make sure you set airflow.task and airflow.task_runner handlers to s3.task
  5. Set task_log_reader = s3.task in airflow.cfg
  6. Pass the S3_LOG_FOLDER to log_config. I did that using a variable and retrieving it as in the following log_config.py.

Here is a log_config.py that work:

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

Note that this way S3_LOG_FOLDER can be specified in airflow.cfg or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER.



回答2:

One more thing that leads to this behavior (Airflow 1.10):

If you look at airflow.utils.log.s3_task_handler.S3TaskHandler, you'll notice that there are a few conditions under which the logs, silently, will not be written to S3:

1) The logger instance is already close()d (not sure how this happens in practice)
2) The log file does not exist on the local disk (this is how I got to this point)

You'll also notice that the logger runs in a multiprocessing/multithreading environment, and that Airflow S3TaskHandler and FileTaskHandler do some very no-no things with the filesystem. If assumptions about log files on disk are met, S3 log files will not be written, and nothing is logged nor thrown about this event. If you have specific, well defined needs in logging it might be a good idea to implement all your own logging Handlers (see python logging docs) and disable all Airflow log handlers (see Airflow UPDATING.md).



回答3:

One more thing that may lead to this behaviour - botocore may be not installed. Make sure when installing airflow to include s3 package pip install apache-airflow[s3]