How do I setup Airflow's email configuration t

2019-02-18 09:54发布

问题:

I'm trying to make an Airflow task intentionally fail and error out by passing in a Bash line (thisshouldnotrun) that doesn't work. Airflow is outputting the following:

[2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found
[2017-06-15 17:44:17,869] {bash_operator.py:97} INFO - Command exited with return code 127
[2017-06-15 17:44:17,869] {models.py:1417} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-06-15 17:44:17,871] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
[2017-06-15 17:44:17,878] {models.py:1462} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

Will Airflow send an email for these kind of errors? If not, what would be the best way to send an email for these errors?

I'm not even sure if airflow.cfg is setup properly... Since the ultimate goal is to test the email alerting notification, I want to make sure airflow.cfg is setup properly. Here's the setup:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow_data_user
# smtp_password = password
smtp_port = 587 
smtp_mail_from = airflow_data_user@domain.com

What is smtp_starttls? I can't find any info for it in the documentation or online. If we have 2-factor authentication needed to view emails, will that be an issue here for Airflow?

Here's my Bash command:

task1_bash_command = """
export PATH=/home/ubuntu/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin
export rundate=`TZ='America/Los_Angeles' date +%F -d "yesterday"`
export AWS_CONFIG_FILE="/home/ubuntu/.aws/config"

/home/ubuntu/bin/snowsql -f //home/ubuntu/sql/script.sql 1> /home/ubuntu/logs/"$rundate"_dev.log 2> /home/ubuntu/logs/"$rundate"_error_dev.log

if [ -e /home/ubuntu/logs/"$rundate"_error_dev.log ]
then
    exit 64
fi

And my task:

task1 = BashOperator(
    task_id = 'run_bash',
    bash_command = task1_bash_command,
    dag = dag,
    retries = 2,
    email_on_failure = True,
    email = 'username@domain.com')

回答1:

smtp_starttls basically means Use TLS

Set this to False and set smtp_ssl to True if you want to use SSL instead. You probably need smtp_user and smtp_password for either.

Airflow will not handle 2 step authentication. However, is you are using AWS you likely don't need it as your SMTP (SES) credentials are different from your AWS credentials.

See here.

EDIT: For airflow to send an email on failure, there are a couple things that need to be set on your task, email_on_failure and email.

See here for example:

def throw_error(**context):
    raise ValueError('Intentionally throwing an error to send an email.')



t1 = PythonOperator(task_id='throw_error_and_email',
                    python_callable=throw_error,
                    provide_context=True,
                    email_on_failure=True,
                    email='your.email@whatever.com',
                    dag=dag)