How to get last two successful execution dates of Airflow job?
Asked Answered
C

3

6

I need to get last two successful execution dates of Airflow job to use in my current run. Example : Execution date Job status 2020-05-03 success 2020-05-04 fail 2020-05-05 success

Question : When I run my job on May 6th I should get values of May 3rd and 5th into variables. Is it possible?

Cahoot answered 8/5, 2020 at 3:40 Comment(1)
For readers of this question, also worth checking [1] Apache airflow macro to get last dag run execution time, [2] Getting the date of the most recent successful DAG execution or this search resultYarkand
Y
5

You can leverage SQLAlchemy magic for retrieving execution_dates against last 'n' successfull runs

from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance

def last_execution_date(
    dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[Pendulum]:
    """
    This function is to queries against airflow table and
    return the most recent execution date
    Args:
        dag_id: dag name
        task_id : task name
        n : number of runs
        session: Session to connect airflow postgres db
    Returns:
        list of execution date of most of recent n runs
    """
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS,
        )
        .order_by(TaskInstance.execution_date.desc())
        .limit(n)
    )
    execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

# Above function can be used as utility function and can be leveraged with provide_session as below:
 
last_success_date_fn = provide_session(last_execution_date) # can use provide session decorator as is.

This snippet is tested end to end and can be used in prod.

I've referred to tree() method of views.py for coming up with this script.


Alternatively, you can fire this SQL query to the Airflow's meta-db to retrieve last n execution dates with successful runs

SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
  AND task_id = 'my_task_id'
  AND state = 'success'
ORDER BY execution_date DESC
LIMIT n
Yarkand answered 8/5, 2020 at 22:37 Comment(1)
I tried to run the above piece of code but It seems to generate a query that has a syntax error in this place ORDER BY EXISTS ( SELECT 1 FROM dag_run WHERE dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id AND dag_run.execution_date DESC ) What should be the solution of such a problem?Vassily
B
1

If you are getting a syntax error in the generated queries from these examples, I managed to get this working using the following:

def last_execution_date(
    dag_id: str, task_id: str, n: int):
    session = Session()
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS,
        )
        .order_by(TaskInstance.end_date.desc())
        .limit(n)
    )
    execution_dates = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

The problem with ordering on execution_date is that it necessitates a join to the DagRun model to pick up execution_date. I have ordered by a column present on the TaskInstance model. This code makes the assumption that all tasks with a state of state.SUCCESS will have an end_date, and that the end_date will accurately order the tasks in execution order.

Also this script can help determine what underlying SQL will be run - this can be run in a local/virtualenv with Airflow installed:

from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun

session=Session()

query_val = (
    session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == 'foo'
        ) 
        .order_by(TaskInstance.end_date.desc())
)

print(str(query_val))

Which at the time of writing in Airflow 2.5.1 returns (formatted for readability):

SELECT
    task_instance.try_number AS task_instance_try_number,
    task_instance.task_id AS task_instance_task_id,
    task_instance.dag_id AS task_instance_dag_id,
    task_instance.run_id AS task_instance_run_id,
    task_instance.map_index AS task_instance_map_index,
    task_instance.start_date AS task_instance_start_date,
    task_instance.end_date AS task_instance_end_date,
    task_instance.duration AS task_instance_duration,
    task_instance.state AS task_instance_state,
    task_instance.max_tries AS task_instance_max_tries,
    task_instance.hostname AS task_instance_hostname,
    task_instance.unixname AS task_instance_unixname,
    task_instance.job_id AS task_instance_job_id,
    task_instance.pool AS task_instance_pool,
    task_instance.pool_slots AS task_instance_pool_slots,
    task_instance.queue AS task_instance_queue,
    task_instance.priority_weight AS task_instance_priority_weight,
    task_instance.operator AS task_instance_operator,
    task_instance.queued_dttm AS task_instance_queued_dttm,
    task_instance.queued_by_job_id AS task_instance_queued_by_job_id,
    task_instance.pid AS task_instance_pid,
    task_instance.executor_config AS task_instance_executor_config,
    task_instance.updated_at AS task_instance_updated_at,
    task_instance.external_executor_id AS task_instance_external_executor_id,
    task_instance.trigger_id AS task_instance_trigger_id,
    task_instance.trigger_timeout AS task_instance_trigger_timeout,
    task_instance.next_method AS task_instance_next_method,
    task_instance.next_kwargs AS task_instance_next_kwargs,
    dag_run_1.state AS dag_run_1_state,
    dag_run_1.id AS dag_run_1_id,
    dag_run_1.dag_id AS dag_run_1_dag_id,
    dag_run_1.queued_at AS dag_run_1_queued_at,
    dag_run_1.execution_date AS dag_run_1_execution_date,
    dag_run_1.start_date AS dag_run_1_start_date,
    dag_run_1.end_date AS dag_run_1_end_date,
    dag_run_1.run_id AS dag_run_1_run_id,
    dag_run_1.creating_job_id AS dag_run_1_creating_job_id,
    dag_run_1.external_trigger AS dag_run_1_external_trigger,
    dag_run_1.run_type AS dag_run_1_run_type,
    dag_run_1.conf AS dag_run_1_conf,
    dag_run_1.data_interval_start AS dag_run_1_data_interval_start,
    dag_run_1.data_interval_end AS dag_run_1_data_interval_end,
    dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision,
    dag_run_1.dag_hash AS dag_run_1_dag_hash,
    dag_run_1.log_template_id AS dag_run_1_log_template_id,
    dag_run_1.updated_at AS dag_run_1_updated_at
FROM
    task_instance
    JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id
    AND dag_run_1.run_id = task_instance.run_id
WHERE
    task_instance.dag_id = ?
ORDER BY
    task_instance.end_date DESC
Bloomy answered 1/11, 2023 at 1:46 Comment(0)
D
0

In the lastest version of airflow:

def last_execution_date(
    dag_id: str, task_id: str, n: int):
    session = Session()
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS,
        )
        .order_by(TaskInstance.execution_date.desc())
        .limit(n)
    )
    execution_dates = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates
Devilkin answered 1/2, 2023 at 21:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.