Airflow error importing DAG using plugin - Relationships can only be set between Operators
Asked Answered
S

1

7

I have written an airflow plugin that simply contains one custom operator (to support CMEK in BigQuery). I can create a simple DAG with a single task that uses this operator and that executes fine.

However if I try and create a dependency in the DAG from a DummyOperator task to my custom operator task the DAG fails to load in the UI and throws the following error and I can't understand why this error is being thrown?

Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator

I have tested this so far on composer-1.4.2-airflow-1.9.0, composer-1.4.2-airflow-1.10.0 and composer-1.4.1-airflow-1.10.0.

Running airflow test for each of the tasks completes without error.

Using it in isolation in a DAG works fine (as below) so I don't believe there is anything inherently wrong with the plugin

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)

Whereas if I introduce a DummyOperator and dependency then the error occurs

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator

default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}

dag = DAG(
    'js_bq_custom_plugin_v2',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

etl_start = DummyOperator(task_id='etl_start', dag=dag)

extract = BQCMEKOperator(
    task_id     = 'extract',
    sql         = 'select * from foo.bar LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test5',
    key         = 'xxx',
    dag         = dag
)

etl_start.set_downstream(extract)

The operator itself is straightforward and I can reproduce the issue with the simplest custom operator such as the one below

import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class TestOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                *args,
                **kwargs):
        super(TestOperator, self).__init__(*args, **kwargs)


    def execute(self, context):
        logging.info("Executed by TestOperator")

With the following plugin definition in init.py

from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator

class TestPlugin(AirflowPlugin):
    name = "test_plugin"
    operators = [TestOperator]
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []

Also having looked at the airflow code in models.py that generates this error it uses isinstance(t, BaseOperator) and this returns true for my task using my custom operator when I just run it in python so I have no idea what is going on?

for t in task_list:
    if not isinstance(t, BaseOperator):
        raise AirflowException(
            "Relationships can only be set between "
            "Operators; received {}".format(t.__class__.__name__))
Statics answered 23/1, 2019 at 21:35 Comment(1)
Very strange; could this be a python-ic issue given that it originates here?Mayers
M
5

There was a bug introduced in composer-1.4.2 release which we have fixed by now, try create a new Composer environment and that DAG error should go away. Meanwhile, we'll also apply that fix automatically on existing 1.4.2 environments over the next few days.

Mayonnaise answered 25/1, 2019 at 7:30 Comment(2)
If you don't want to wait on the auto fix rollout or create a new environment, you may apply the fix manually by running the following commands in your Composer GKE cluster: kubectl get deployment airflow-scheduler -o yaml | sed -e 's/cloud_composer_service_2019-01-10-RC0/cloud_composer_service_2019-01-10-RC1/'|kubectl replace --force -f - kubectl get deployment airflow-worker -o yaml | sed -e 's/cloud_composer_service_2019-01-10-RC0/cloud_composer_service_2019-01-10-RC1/'|kubectl replace --force -f -Mayonnaise
thank you much, I also had this issue yesterday and was going mad!! :joy:Rickey

© 2022 - 2024 — McMap. All rights reserved.