How to use a table as Dataset for airflow in Data-aware scheduling?
Asked Answered
H

3

5

I have seen sample for the new data-aware scheduling where datatset is a file(csv,txt e.t.c) I wanted to check if i can use a SQL server table as Dataset?

from airflow import Dataset
dataset = Dataset(uri='{something to point to table}')
with DAG(dag_id='dataset-consumer', schedule=[dataset]):
Heyman answered 28/3, 2023 at 16:58 Comment(1)
Good question. The sheer lack of documented examples regarding specific usable URI's in Airflow Datasets is unfortunate.Emmanuel
B
5

A Dataset is really just a reference string that tracks the status of the task it is inside (using the task's outlets parameter). It is a potentially more transparent and scalable way to create dependencies in Airflow.

Datasets allow the success of one task to trigger other DAGs, with minimal code. They also allow for some separation, similar to the function of an api between different applications. However, datasets do not connect to, track, or even know about your actual data.

Datasets and URIs are summarized well by Marc Lamberti from Astronomer in this YouTube video on Datasets:

[You] can put pretty much whatever you want for the URI of your dataset. It's because Airflow doesn't care about if the actual data is updated or not. Indeed the only thing that Airflow monitors is if the task that updates the dataset successfully completes or not. If it successfully completes then the DAG is triggered. If it fails then the DAG is not triggered. It is as simple as that.

think of the URI as the unique identifier of a dataset and not as a way for Airflow to access the actual data of your dataset. That means if another tool like Spark updates that dataset, Airflow will not be aware of that and your DAG won't be triggered.

Note: If you need to listen for external data changes, Airflow Sensors are still the way to go.

The Airflow Dataset documentation, while more technically worded, supports this:

Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string

These are all valid ways to create Datasets:

mysql_data = Dataset('mysql://localhost:3306/database_name?table=table_name')
bigquery_table = Dataset('bigquery://gcp-project-name/dataset-name/table-name')
some_table = Dataset('table://database/table_name')
some_other_table = Dataset('table_name')
some_file = Dataset('file_name.csv')

While it doesn't technically matter what string you choose, it's typically to your benefit to define it clearly. I referenced the mysql uri docs to create the mysql one. Bigquery doesn't have a direct URI, so I made up my own to reference. You can even use simple strings. You determine how detailed you want to be.

Datasets have a very simple set of options now (as of Airflow v2.7.1), but they likely lay the foundation for more data-aware pipelines in future Airflow versions.

Bearden answered 16/9, 2023 at 23:26 Comment(1)
Thank you... This was much more helpful than the official documentation...Contaminant
T
4

Yes, your URI can represent anything. It doesn’t even have to point to anything real, you just have to use the same string in the tasks that update it and the DAGs that trigger from it.

Thereby answered 1/4, 2023 at 17:27 Comment(5)
As much as we get the very generic concept, a few examples would be welcome (s3 is too obvious and way too specific)Emmanuel
“foo” is a valid Dataset name. So is “stackoverflow.com”. Airflow doesn’t know anything about what the name means at all.Thereby
wait. do you mean its "just" a label ? If that is so this so darn confusing honestly. Why would anyone do that ? I mean, the documentation does read "Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string". But I am puzzled by the whole mechanic behind it. How does Airflow actually know the dataset has been effectively updated ?Emmanuel
It doesn’t. It only knows that a task that you listed has an output of this Dataset ran successfully.Thereby
I see. What is the difference with any "success status" of any generic operator that does not involve data then ? Looks like a dummy dataset-awareness to me... Or I'm missing something.Emmanuel
E
4

Long Story Short :
Yes it is possible

Here is a fully working code :

import psycopg2
from datetime import datetime
from airflow import DAG, Dataset
from airflow.operators.python_operator import PythonOperator

# define dataset via complete URI to the pg table
pg_dataset=[Dataset("postgres://user:passwd@postgres:5432/mydatabase?table=my_test_table")]

# DAG That writes into pg table and outlets the pg_dataset
with DAG(
    'write_to_postgres_ux',
    start_date=datetime(2023, 4, 9),
    schedule_interval='@daily'
)as dag:

    def write_to_postgres():
        conn = psycopg2.connect(
            host="postgres",
            database="airflow",
            user="airflow",
            password="airflow"
        )
        cur = conn.cursor()
        cur.execute("""
            CREATE TABLE IF NOT EXISTS my_test_table (col1 TEXT,col2 TEXT)
        """)
        cur.execute("INSERT INTO my_test_table (col1, col2) VALUES ('value1', 'value2')")
        conn.commit()
        cur.close()
        conn.close()

    write_task = PythonOperator(
        task_id='write_task',
        outlets=pg_dataset,       <----- This line here is important
        python_callable=write_to_postgres,
        dag=dag
    )

# DAG That reads from pg table and scheduled by pg_dataset update
with DAG(
    'read_from_postgres_ux',
    start_date=datetime(2023, 1, 1),
    schedule = pg_dataset  <----- This line here is important
) as dag:

    def read_from_postgres():
        conn = psycopg2.connect(
            host="postgres",
            database="airflow",
            user="airflow",
            password="airflow"
        )
        cur = conn.cursor()
        cur.execute("SELECT col1, col2 FROM my_test_table")
        rows = cur.fetchall()
        for row in rows:
            print(row)
        cur.close()
        conn.close()

    read_task = PythonOperator(
        task_id='read_task',
        python_callable=read_from_postgres,
        dag=dag
    )

Now when your run the write DAG, data is written in the dataset, wich triggers the Read DAG, and voila !

enter image description here

PS : If someone knows how to do this with redis, I welcome any hint (not familiar with redis and couldnt come up with a working URI as it needs to include the table as well, not just the database)

Emmanuel answered 9/4, 2023 at 11:16 Comment(1)
Please note that the dataset and the events that can be associated with them are not encrypted, and the Airflow documentation warns you not to but any sensitive information in them.Homeland

© 2022 - 2024 — McMap. All rights reserved.