Email on failure using AWS SES in Apache Airflow DAG
Asked Answered
R

2

6

I am trying to have Airflow email me using AWS SES whenever a task in my DAG fails to run or retries to run. I am using my AWS SES credentials rather than my general AWS credentials too.

My current airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = [email protected]

Current task in my DAG that is designed to intentionally fail and retry:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3,
    retry_delay=timedelta(seconds=15),
    python_callable=add_library_to_cluster,
    params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
    dag=dag,
    email_on_failure=True,
    email_on_retry=True,
    email=’[email protected]’,
    provide_context=True
)

Everything works as designed as the task retries the set number of times and ultimately fails, except no emails are being sent. I have checked the logs in the task mentioned above too, and smtp is never mentioned.

I've looked at the similar question here, but the only solution there did not work for me. Additionally, Airflow's documentation such as their example here does not seem to work for me either.

Does SES work with Airflow's email_on_failure and email_on_retry functions?

What I am currently thinking of doing is using the on_failure_callback function to call a python script provided by AWS here to send an email on failure, but that is not the preferable route at this point.

Thank you, appreciate any help.

Rhea answered 1/6, 2018 at 14:22 Comment(2)
Have you verified the "from" address with SES?Immensurable
@Immensurable yes, both from and to are verified. The "from" address is our monitoring account's email, and we already receive updates from it so it is confirmed to be working also.Rhea
R
13

--updated 6/8 with working SES

here's my write up on how we got it all working. There is a small summary at the bottom of this answer.

Couple of big points:

  1. We decided not to use Amazon SES, and rather use sendmail We now have SES up and working.
  2. It is the airflow worker that services the email_on_failure and email_on_retry features. You can do journalctl –u airflow-worker –f to monitor it during a Dag run. On your production server, you do NOT need to restart your airflow-worker after changing your airflow.cfg with new smtp settings - it should be automatically picked up. No need to worry about messing up currently running Dags.

Here is the technical write-up on how to use sendmail:

Since we changed from ses to sendmail on localhost, we had to change our smtp settings in the airflow.cfg.

The new config is:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from =  [email protected]

This works in both production and local airflow instances.

Some common errors one might receive if their config is not like mine above:

  • socket.error: [Errno 111] Connection refused -- you must change your smtp_host line in airflow.cfg to localhost
  • smtplib.SMTPException: STARTTLS extension not supported by server. -- you must change your smtp_starttls in airflow.cfg to False

In my local testing, I tried to simply force airflow to show a log of what was going on when it tried to send an email – I created a fake dag as follows:

# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# General imports
from datetime import datetime,timedelta

def throwerror():
    raise ValueError("Failure")

SPARK_V_2_2_1 = '3.5.x-scala2.11'

args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
}

dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
)

t2.set_upstream(t1)

If you do the journalctl -u airflow-worker -f, you can see that the worker says that it has sent an alert email on the failure to the email in your DAG, but we were still not receiving the email. We then decided to look into the mail logs of sendmail by doing cat /var/log/maillog. We saw a log like this:

Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<[email protected]>, size=1297, nrcpt=1 (queue active)
Jun  5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun  5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

So this is probably the biggest "Oh duh" moment. Here we are able to see what is actually going on in our smtp service. We used telnet to confirm that we were not able to connect to the targeted IP ranges from gmail.

We determined that the email was attempting to be sent, but that the sendmail service was unable to connect to the ip ranges successfully.

We decided to allow all outbound traffic on port 25 in AWS (as our airflow production environment is an ec2 instance), and it now works successfully. We are now able to receive emails on failures and retries (tip: email_on_failure and email_on_retry are defaulted as True in your DAG API Reference - you do not need to put it into your args if you do not want to, but it is still good practice to explicitly state True or False in it).

SES now works. Here is the airflow config:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = [email protected] (Verified SES email)

Thanks!

Rhea answered 5/6, 2018 at 15:22 Comment(0)
A
0

Similar case here, I tried to follow the same debugging process but got no log output. Also, the outbound rule for my airflow ec2 instance is open to all ports and ips, so it should be some other causes.

I noticed that when you create the SMTP credential from SES, it will also create an IAM user. I am not sure how is airflow running in your case (bare metal on ec2 instance or wrapped in containers), and how that user access is set up.

Ashworth answered 6/5, 2021 at 6:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.