Airflow dag dependencies not available to dags when running Google's Cloud Compose
Asked Answered
D

4

5

Airflow allows you to put dependencies (external python code to the dag code) that dags rely on in the dag folder. this means any components/members or classes in those external python code is available for use in the dag code.

When doing this (in the GCS dag folder of the cloud compose environment) however, the dependencies' components are not available to the dags. an error similar to the following is displayed in the Airflow Web UI: Broken DAG: [/home/airflow/gcs/dags/....py] No module named tester. where tester is a separate python file in the dags folder.

when testing those tasks using Google's SDK (running actual Airflow commands) the tasks run fine but it seems somewhere in Kubernettes creating those container jobs, it does not seem to take over the dependencies too.

I realise Cloud Compose is in Beta but I was wondering if I am doing something wrong.

Deprecatory answered 16/5, 2018 at 15:36 Comment(2)
Are you sure the dependencies are being ignored or could it be that the Airflow base dir / Python path does not include /home/airflow/gcs/dags?Cene
I'm not sure if /home/airflow/gcs/dags is included in the python path, it probably isn't but since Composer uses Kubernettes I don't think that is something I can control and I am not sure about the Airflow base dir, is that something you can control in the config file? please excuse my ignorance, I am new to Airflow.Deprecatory
E
8

You should put the module in a separate folder that contains an __init__.py file (Airflow doesn't like __init__.py files in its top-level DAGs directory).

For example, if you have the following directory structure:

dags/
    my_dag.py
    my_deps/
        __init__.py
        dep_a.py
        dep_b.py

You can write from my_deps import dep_a, dep_b in my_dag.py.

Encrust answered 30/5, 2018 at 19:13 Comment(3)
how to call dependenncy in case, dep_a.py depends on some function defined in dep_b.py and the python operator in my_dag calls a python_callable in dep_a.pyWrapped
In dep_a, you can from dep_b import foo. In my_dag, from my_deps import dep_a; dep_a.some_callable() should work.Encrust
The above works great for DAGs but how about plugins? Following the same logic for the /plugins directory doesn't seem to work. I have /plugins/plugin.py and /plugins/dependencies/dep_a and calling import dependencies.dep_a fails. I have init.py in all but /plugins directories.Gorham
B
2

Are you looking for how to install Python dependencies? https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

Also, the DAGs folder that lives in your GCS bucket (gcloud beta composer environments describe [environment] to get this bucket; gs://{composer-bucket}/dags) should map to /home/airflow/gcs/dags in your pods. Have you tried SSHing into a node to find this?

Benevolence answered 17/5, 2018 at 19:41 Comment(3)
not a pypi package just an external python file that I drop in the Dag folder. I ssh'd into the nodes /home/airflow/gcs/dags does not exist, should it?Deprecatory
It should; and you could be able to point to that file (perhaps not in the dags/ folder, but maybe in a data/ folder)? When you gcloud beta composer environments describe [your environment], do you see a link to a GCS bucket? That bucket should host your dags.Benevolence
I think your issue is that you're putting the file in the DAGs folder. Your environment is configured to try to parse everything in that directory as a DAG, which is why you're getting that error that the DAG is broken! Putting the file in your data/ directory should do the trick. :)Benevolence
S
2

I had the same issue and had help resolving it on the mailing list. For reference, see the thread here: https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY. There's a link to a handy Github Gist with some comments on it as well.

In order to write and import your own dependencies into your DAGs, you'll want to zip your dags and their dependencies as described here: https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags.

You can upload that zip file directly to your Cloud Composer GCS bucket and Airflow will pick it up.

Make sure your dependencies are packages, not modules, at the top-level of your dags directory.

from foo_dep.foo_dep import my_utility_function will work here:

foo_dag.py
foo_dep/__init__.py
foo_dep/foo_dep.py

from foo_dep import my_utility_function seems like it should work with the following dags directory structure (and will work locally), but it will not work in Airflow:

foo_dag.py
foo_dep.py
Sisak answered 18/6, 2018 at 22:2 Comment(0)
C
0

From the official docs on configuring Airflow:

The first time you run Airflow, it will create a file called airflow.cfg in your $AIRFLOW_HOME directory (~/airflow by default). This file contains Airflow’s configuration and you can edit it to change any of the settings

In this file set in the very first setting

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow/gcs/dags

the base path to Airflow.

Cene answered 17/5, 2018 at 9:30 Comment(3)
right yea that is set up correctly by Google's Composer and the dependencies are in that directory. which is why when you test the tasks in those dags, they pass with the correct dependencies. The dependencies are ignore during the actual dag runs orchestrated by Kubernettes.Deprecatory
@SorooshAvazkhani I haven't been working with Kubernetes that much. It would be interesting to know how exactly the dag run works out. Is the configuration ignored? Is it another node and the filestructure is different so a relative path might be needed? Can you expose some sort of relative file path? Should Kubernetes know about Python path?Cene
I am trying to get my head around dag runs now. for sure the file structure /home/airflow/gcs/dags does not exist on the nodes. I will post back once I know how Kubernetts creates workloads from those dags but it looks like they just take over the dags themselves and not the dependencies. One way to get around this is to write your dependencies as Airlow plugins which you can add to the Composer environment but it seems to me a basic shortfall on their end. Again though, I may be doing something stupid.Deprecatory

© 2022 - 2024 — McMap. All rights reserved.