Can I programmatically determine if an Airflow DAG was scheduled or manually triggered?
Asked Answered
F

5

15

I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.

If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:

execution_date.month

given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).

I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id from the database (by creating a macro that has a DB session), check wheter the run_id contains the word manual. Is there a better way to solve this problem?

Funeral answered 5/2, 2020 at 14:5 Comment(0)
B
8

There is no direct DAG property to identify manual runs for now. To get this information you would need to check the run_id as you mentioned.

However, there is a dedicated macro get the run_id. You don't have to fetch it from the database by yourself. Here is an example on how to use it :

    def some_task_py(**context):
        run_id = context['templates_dict']['run_id']
        is_manual = run_id.startswith('manual__')
        is_scheduled = run_id.startswith('scheduled__')


    some_task = PythonOperator(
                task_id = 'some_task',
                dag=dag,
                templates_dict = {'run_id': '{{ run_id }}'},
                python_callable = some_task_py,
                provide_context = True)
Blaeberry answered 5/2, 2020 at 15:9 Comment(3)
Shame, indeed what I was afraid of. The run_id macro variable is a good tip indeed, I forgot that existed.Funeral
@Funeral Out of curiosity, what do you feel are the flaws of this method?Painless
@Painless Ideally there would be something provided by Airflow. If 10 versions down the line they would decide to change the run_id generation mechanism an airflow provided mechanism would still work while this would break. I am not saying it would happen, but there is no contract in this case between me and Airflow.Funeral
P
16

tl;dr: You can determine this with DagRun.external_trigger.


I noticed that in the Tree View, there's an outline around runs that are scheduled, but not manual. That's because the latter has stroke-opacity: 0; applied in CSS.

Searching the repo for this, I found how Airflow devs detect manual runs (5 year old line, so should work in older version as well):

.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})

Searching for external_trigger brings us to the DagRun definition.

So if you were using, for example, a Python callback, you can have something like this (can be defined in the DAG, or a separate file):

def my_fun(context):
    if context.get('dag_run').external_trigger:
        print('manual run')
    else:
        print('scheduled run')

and in your Operator set the parameter like:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    on_failure_callback=my_fun,
    dag=dag,
)

I have tested something similar and it works.

I think you can also do something like if if {{ dag_run.external_trigger }}: - but I haven't tested this, and I believe it would only work in that DAG's file.

Painless answered 1/5, 2020 at 21:29 Comment(0)
B
8

There is no direct DAG property to identify manual runs for now. To get this information you would need to check the run_id as you mentioned.

However, there is a dedicated macro get the run_id. You don't have to fetch it from the database by yourself. Here is an example on how to use it :

    def some_task_py(**context):
        run_id = context['templates_dict']['run_id']
        is_manual = run_id.startswith('manual__')
        is_scheduled = run_id.startswith('scheduled__')


    some_task = PythonOperator(
                task_id = 'some_task',
                dag=dag,
                templates_dict = {'run_id': '{{ run_id }}'},
                python_callable = some_task_py,
                provide_context = True)
Blaeberry answered 5/2, 2020 at 15:9 Comment(3)
Shame, indeed what I was afraid of. The run_id macro variable is a good tip indeed, I forgot that existed.Funeral
@Funeral Out of curiosity, what do you feel are the flaws of this method?Painless
@Painless Ideally there would be something provided by Airflow. If 10 versions down the line they would decide to change the run_id generation mechanism an airflow provided mechanism would still work while this would break. I am not saying it would happen, but there is no contract in this case between me and Airflow.Funeral
A
4

In Airflow 2.X we can check the run type with below parameters..

  1. Using Template : {{ dag_run.run_type }} For Example, run_type = '{{ "manual" if dag_run.run_type=="manual" else "schedule" }}'

  2. Using python Operator:

def run_type_check(**context): run_type = context['dag_run'].run_type if run_type=="manual": print "Manual Run.." else: print "Schedule Run"

I hope this helps

Akerboom answered 31/7, 2023 at 1:34 Comment(0)
E
3

Following to @Donentolon answer, I've managed to determine if the DAG was triggered manually or scheduled, by getting the dag_run from kwargs in python_callable (of my PythonOperator):

def my_python_callable(**kwargs):
    dag_run = kwargs.get("dag_run")

    if dag_run.external_trigger:
        logger.info("DAG triggered manually, skipping this operator")
        return True

    # my operator logic for scheduled run
Effy answered 19/8, 2021 at 7:56 Comment(0)
D
0

I needed to be able to detect if something was scheduled or triggered, including runs from the command line using airflow tasks test (or the old airflow test).

My parameter to the tasks is **kwargs and not context like def some_task_py(**context) so my example uses kwargs.

If you run from the command line I believe kwargs['dag_run'] will be None and kwargs['templates_dict']['run_id'] will not exist.

I have tested and this should work from a command line run or from the web server scheduled or manually triggered:

if kwargs['dag_run'] == None or (kwargs['dag_run'] != None and kwargs['dag_run'].external_trigger):
    print("This is an external run.  Mark it as such")
else:
    print("This is a scheduled run")
Descend answered 31/10, 2021 at 16:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.