How to trigger an airflow DAG run from within a Python script?
Asked Answered
A

3

10

Using apache airflow, I created some DAGS, some of which do not run on a schedule.
I'm trying to find a way that I can trigger a run for a specific DAG from within a Python script. Is this possible? How can I do?

EDIT --- The python script will be running from a different project from the project where all my DAGS are located

Ayrshire answered 4/2, 2020 at 10:15 Comment(0)
S
20

You have a variety of options when it comes to triggering Airflow DAG runs.

Using Python

The airflow python package provides a local client you can use for triggering a dag from within a python script. For example:

from airflow.api.client.local_client import Client

c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})

Using the Airflow CLI

You can trigger dags in airflow manually using the Airflow CLI. More info on how to use the CLI to trigger DAGs can be found here.

Using the Airflow REST API

You can also use the Airflow REST api to trigger DAG runs. More info on that here.


The first option from within python might work for you best (it's also how I've personally done it in the past). But you could theoretically use a subprocess to interact with the CLI from python, or a library like requests to interact with the REST API from within Python.

Stannic answered 5/2, 2020 at 5:31 Comment(3)
do you know how i can import all of my dags from the other project?Ayrshire
If you're trying to do it from within another python project, you'd need to make sure that project has access to the same airflow config, database back-end and DAGs. You can then import the airflow module and you should be good to go. Alternatively, you could expose your running instance of Airflow via HTTP. You could then use a library like requests in your second project to make HTTP requests to the original running Airflow stack to trigger the dag.Stannic
I'm still not able to run a DAG defined in the test via the local_client. If you could take a look at this question, I'd greatly appreciate it!Pabulum
E
0

On AWS MWAA Airflow 1.10.12 I used solution based on boto3 library for Python and REST POST request:

import boto3
import requests

def TriggerAirflowDAG(mwaa_environment, dag_id):
    client = boto3.client("mwaa")
    token = client.create_cli_token(Name=mwaa_environment)
    url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
    body = f"trigger_dag {dag_id}"
    headers = {
        "Authorization": "Bearer " + token["CliToken"],
        "Content-Type": "text/plain"
    }
    return requests.post(url, data=body, headers=headers)

User/role who initiates DAG run must have AmazonMWAAAirflowCliAccess policy.

Erikerika answered 25/6, 2021 at 14:45 Comment(0)
L
0

I use this snipped for that:

import requests

def trigger_my_dag(**kwargs):
    """
    curl --location 'https://{my_airflow_url}/api/v1/dags/{my_dag_name}/dagRuns' \
        --header 'Content-Type: application/json' \
        --header 'Cache-Control: no-cache' \
        --header 'Authorization: Basic ****' \
        --data '{"conf": {}}'
    """
    url = "https://{my_airflow_url}/api/v1/dags/{my_dag_name}/dagRuns"
    data = {"conf": {}}
    headers = {
        "Authorization": "Basic ****",
        "Content-Type": "application/json",
        "Cache-Control": "no-cache",
    }
    response = requests.post(url, json=data, headers=headers)
    return response.status_code == 200
Lafontaine answered 20/1 at 15:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.