I have a use case in which, I am downloading some json files and parsing them. Depending on the files that are downloaded, the program needs to populate data in different tables. Once the data is loaded in the tables, an email notification must be sent.
For example, if the program needs to populate tables a and b (obtained from table_list), then the workflow should look like download >> parse >> [load_table_a, load_table_b] >> send_email
if tables a, b, c, d are obtained from table_list, then the workflow should look like download >> parse >> [load_table_a, load_table_b, load_table_c, load_table_d] >> send_email
Here is what I am trying. Can someone please help out.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp
default_args = {
'start_date': datetime(2021, 5, 18)
}
with DAG(
'Test DAG',
default_args = default_args,
catchup = False
) as dag:
download = PythonOperator(
task_id = 'download',
python_callable = download,
email_on_failure = True,
email = '[email protected]'
)
parse = PythonOperator(
task_id = 'parse',
python_callable = parse,
email_on_failure = True,
email = '[email protected]'
)
table_list = get_table_list()
task_list = []
for table in table_list:
task_list.append(
PythonOperator(
task_id = 'load_table_{}'.format(table),
python_callable = insert_into_sf,
email_on_failure = True,
email = '[email protected]',
op_kwargs = {'category': table}
)
)
send_email = EmailOperator(
task_id = 'send_email',
to = ['[email protected]'],
subject = 'Airflow: Success',
html_content = 'Dag run completed succesfully.'
)
download >> parse >> [task for task in task_list] >> send_email
get_table_list
will be evaluated every time DAG is parsed so if it's a heavy process (db query, requests etc) then it will impact Airflow scheduler performance. In such cases it's recommend to consider some form of lazy evaluation. – Sunfish