If you are getting a syntax error in the generated queries from these examples, I managed to get this working using the following:
def last_execution_date(
dag_id: str, task_id: str, n: int):
session = Session()
query_val = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
TaskInstance.state == State.SUCCESS,
)
.order_by(TaskInstance.end_date.desc())
.limit(n)
)
execution_dates = list(map(lambda ti: ti.execution_date, query_val))
return execution_dates
The problem with ordering on execution_date
is that it necessitates a join to the DagRun
model to pick up execution_date
. I have ordered by a column present on the TaskInstance
model. This code makes the assumption that all tasks with a state of state.SUCCESS
will have an end_date
, and that the end_date
will accurately order the tasks in execution order.
Also this script can help determine what underlying SQL will be run - this can be run in a local/virtualenv with Airflow installed:
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun
session=Session()
query_val = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == 'foo'
)
.order_by(TaskInstance.end_date.desc())
)
print(str(query_val))
Which at the time of writing in Airflow 2.5.1 returns (formatted for readability):
SELECT
task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id,
task_instance.dag_id AS task_instance_dag_id,
task_instance.run_id AS task_instance_run_id,
task_instance.map_index AS task_instance_map_index,
task_instance.start_date AS task_instance_start_date,
task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration,
task_instance.state AS task_instance_state,
task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname,
task_instance.unixname AS task_instance_unixname,
task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool,
task_instance.pool_slots AS task_instance_pool_slots,
task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.operator AS task_instance_operator,
task_instance.queued_dttm AS task_instance_queued_dttm,
task_instance.queued_by_job_id AS task_instance_queued_by_job_id,
task_instance.pid AS task_instance_pid,
task_instance.executor_config AS task_instance_executor_config,
task_instance.updated_at AS task_instance_updated_at,
task_instance.external_executor_id AS task_instance_external_executor_id,
task_instance.trigger_id AS task_instance_trigger_id,
task_instance.trigger_timeout AS task_instance_trigger_timeout,
task_instance.next_method AS task_instance_next_method,
task_instance.next_kwargs AS task_instance_next_kwargs,
dag_run_1.state AS dag_run_1_state,
dag_run_1.id AS dag_run_1_id,
dag_run_1.dag_id AS dag_run_1_dag_id,
dag_run_1.queued_at AS dag_run_1_queued_at,
dag_run_1.execution_date AS dag_run_1_execution_date,
dag_run_1.start_date AS dag_run_1_start_date,
dag_run_1.end_date AS dag_run_1_end_date,
dag_run_1.run_id AS dag_run_1_run_id,
dag_run_1.creating_job_id AS dag_run_1_creating_job_id,
dag_run_1.external_trigger AS dag_run_1_external_trigger,
dag_run_1.run_type AS dag_run_1_run_type,
dag_run_1.conf AS dag_run_1_conf,
dag_run_1.data_interval_start AS dag_run_1_data_interval_start,
dag_run_1.data_interval_end AS dag_run_1_data_interval_end,
dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision,
dag_run_1.dag_hash AS dag_run_1_dag_hash,
dag_run_1.log_template_id AS dag_run_1_log_template_id,
dag_run_1.updated_at AS dag_run_1_updated_at
FROM
task_instance
JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id
AND dag_run_1.run_id = task_instance.run_id
WHERE
task_instance.dag_id = ?
ORDER BY
task_instance.end_date DESC