how to get latest execution time of a dag run in airflow
Asked Answered
R

3

6

I tried below code but still i am getting issue

from airflow.models DagModel

def get_latest_execution_date(**kwargs):

session = airflow.settings.Session()

f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")

try:
    Insurance_last_dag_run = session.query(DagModel)
    for Insdgrun in Insurance_last_dag_run:
        if Insdgrun is None: 
            f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
        else:
            f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
    session.rollback()
finally:
    session.close()

t1 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_latest_execution_date,
    dag=dag)

Is there any way how to fix and get the latest dag run time information

Rebeca answered 6/9, 2020 at 11:19 Comment(7)
A quick search returns some results of which (1) Apache airflow macro to get last dag run execution time, (2) Getting the date of the most recent successful DAG execution and (3) How to get last two successful execution dates of Airflow job? appear applicable to your problem. Also please elaborate "..but still i am getting issue.."Danger
@Danger .. Thnks for the reply , i am getting invalid function "get_last_dag_run"..Rebeca
and i am trying to get all the dags not any specific dagRebeca
@Danger .. Can you pls help me with suggestion which module i need to import to get the last success execution date of the dag , i tried dag model but i didnt getRebeca
(assuming you are going to run this script through some task / operator) you need to use dag property from task: my_task.dag, where my_task is a reference to your task / operator (in a custom operator, you could use self)Danger
@Danger .. I pasted my code. If possible can you please have a look and suggest pls for changesRebeca
@ Can any one pls help with this question..Rebeca
G
14

There are multiple ways to get the most recent execution of a DagRun. One way is to make use of the Airflow DagRun model.

from airflow.models import DagRun

def get_most_recent_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    return dag_runs[0] if dag_runs else None


dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
    print(f'The most recent DagRun was executed at: {dag_run.execution_date}')

You can find more info on the DagRun model and it's properties in the Airflow Docs located here.

Gazette answered 17/9, 2020 at 1:26 Comment(0)
I
1

The PythonOperator op_args parameter is templatized.

The callable only writes the latest execution date to a file so you can implement the function the following way:

def store_last_execution_date(execution_date):
    '''Appends latest execution date to a file
    :param execution_date: The last execution date of the DagRun.
    '''

    with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
        f.write(execution_date)


t1 = PythonOperator(
         task_id="records",
         provide_context=True,
         python_callable=store_last_execution_date,
         op_args=[
             "{{dag.get_latest_execution_date()}}",
         ],
         dag=dag
     )
Ire answered 12/9, 2020 at 9:30 Comment(2)
Tried the exact same snippet and it does not work!Puseyism
with Airflow >=1.10.6 you can use macros in templates. Just replace "{{dag.get_latest_execution_date()}}" with "{{ prev_execution_date }}" Note- two space between text and opening closing curly braces are important. Reference to Airflow docs: airflow.apache.org/docs/apache-airflow/1.10.6/…Carrera
S
0

If you're looking for the date itself as I was when I got here, you can use this (just a small addition on Josh's answer that helped on this):

from airflow.models import DagRun
from datetime import datetime

def get_last_dag_run_date(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    date = dag_runs[0].execution_date
    formatted_date = date.strftime('%m%d%Y')
    if formatted_date:
        return formatted_date
    else:
        raise ValueError(f"No successful runs found for DAG: {dag_id}")

In this case it'll return mmddYYYY

Selective answered 4/7 at 15:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.