Pass other arguments to on_failure_callback
Asked Answered
H

3

12

I'd like to pass other arguments to my on_failure_callback function but it only seems to want "context". How do I pass other arguments to that function...especially since I'd like to define that function in a separate module so it can be used in all my DAGS.

My current default_args looks like this:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure,
  'max_active_runs': 1
}

If I try something like this airflow complains:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure(context,arg1,arg2),
  'max_active_runs': 1
}

so not sure how to pass arg1 and arg2 to my notify_failure fuction that I would like to define in a separate module that I can simply import into my DAG

Harney answered 15/8, 2018 at 0:53 Comment(2)
What type of arguments are you talking about? Are they static/consistent between task runs?Gregoriagregorian
yes they are static between runs, but it seems that the function that I set my on_failure callback cannot have any arguments. I added a bit more info to the top question to hopefully provide some more infoHarney
G
33

Assuming the args are something you can define at the DAG level, then you can use the partials package. ie:

from functools import partial

def generic_failure(arg1, arg2, context):
  # do whatever

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': partial(generic_failure, arg1, arg2),
  'max_active_runs': 1
}

Calling partial(generic_failure, arg1, arg2) will return a function expecting however many arguments are remaining in generic_failure, which in the above example is just the single param context

Gregoriagregorian answered 15/8, 2018 at 20:3 Comment(2)
This answer should be accepted, its elegant and its working!Karynkaryo
This is a very nice solution. Worked for me. I was also able to get default values by doing: partial(failure_callback, named_arg="override_value") ... with the callback function defined like so: def failure_callback(context, named_arg="default_value"). That way, you can optionally pass the callback as normal (without using partial()) and you will get a default value for your arg.Antipater
I
5

you can use a nested function for this

def generic_failure(arg1, arg2):
    def failure(context):
        message = 'we have a function that failed witg args : {ARG1}, {ARG2}'.format(ARG1=arg1,ARG2=arg2)
        print(message)
        return message
    return failure

arg1 = 'arg1'
arg2 = 'arg2'

default_args = {
  'owner': 'Me',
  'on_failure_callback': generic_failure(arg1, arg2),
}
Indecorous answered 22/2, 2019 at 13:24 Comment(1)
If I want to loop through these arg1 and arg2 as a dynamic parameters then how we will pass these dynamic parameters ?Wardlaw
L
1

As a heads up my work around was to use a lambda function to get the context parameter, then just pass that into the function you want on the other side of the lambda:-

on_failure_callback = lambda context: my_function(context, arg2)

Full example:-

def my_function(context, agr2):
  # function code here

default_args = {
  'owner': 'myself',
   ...
   ...
  'on_failure_callback = lambda context: my_function(context, arg2),
}
Luck answered 3/8, 2023 at 11:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.