I’m currently trying to use Airflow to orchestrate a process where some operators are defined dynamically and depend on the output of another (earlier) operator.
In the code below t1 updates a text file with new records (these are actually read from an external queue but, for simplicity, I hard coded them as A, B and C here). Then, I want to create separate operators for each record read from that text file. These operators will create directories A, B and C, respectively, and in Airflow UI will be seen as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_{}'.format(row),
bash_command="mkdir {{params.dir_name}}",
params={'dir_name': row},
dag=dag
)
t1 >> t2
In Airflow’s documentation I can see that the scheduler will execute it [DAG] periodically to reflect the changes if any. Does that mean that there is a risk that, even though my t1 operator is executed before t2, the bash operators are created for the list of records before the update (as that's when the DAG was evaluated)?