Event Handling in Python Luigi
Asked Answered
I

3

6

I've been trying to integrate Luigi as our workflow handler. Currently we are using concourse, however many of the things we're trying to do is a hassle to get around in concourse so we made the switch to Luigi as our dependency manager. No problems so far, workflows trigger and execute properly.

The issue comes in when a task fails for whatever reason. This case specifically the requires block of a task, however all cases need to be taken care of. As of right now Luigi gracefully takes care of the error and writes it to STDOUT. It still emits and exit code 0 though, which to concourse means the job passed. A false positive.

I've been trying to get the event handling to fix this, but I cannot get it to trigger, even with an extremely simple job:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)

class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []

    def run(self):
        pass

    def output(self):
        return []

Then running the command in python shell

luigi.run(main_task_cls=Test, local_scheduler=True)

The exception gets raised, but the even doesn't fire or something. The file doesn't get written and the exit code is still 0.

Also, if it makes a difference I have my luigi config at /etc/luigi/client.cfg which contains

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

I'm at a loss as to why the event handler won't trigger, but somehow I need the process to fail on an error.

Instantaneous answered 28/2, 2017 at 19:49 Comment(1)
Hmm I'm not too sure about how to use luigi, but basically you are going to need a non zero exit code when the tests fail... The concourse event flow is pretty simple. 1. Run the script specified in your task.yml 2. Go green or red based on the exit code of the scriptBipartisan
P
3

It seems like the problem is where you place the "raise Exception" call. If you place it in the requires function - it basically runs before your Test task run method. So it's not as if your Test task failed, but the task it's dependent on (right now, empty...).

for example if you move the raise to run, you're code will behave as you expect.

    def run(self):
       print('start')
       raise Exception()

To handle a case where your dependency fails (in this case, the exception is raised in the requires method), you can add another type of luigi event handler, BROKEN_TASK: luigi.Event.BROKEN_TASK. This will make sure the luigi code emits the return code (different than 0) you expect.

Cheers!

Penology answered 19/2, 2018 at 11:36 Comment(0)
E
1

If you'd like to catch exceptions in requires(), use the following:

@luigi.Task.event_handler(luigi.Event.BROKEN_TASK)
def mourn_failure(task, exception):
    ...
Eberhart answered 6/8, 2019 at 14:47 Comment(0)
W
0

If I understand it correctly, you just want luigi to return an error code when a task fails, I had many issues with this one, but it turns out to be quite simple, you just need to run it with luigi on the command line, not with python. Like this:

luigi --module my_module MyTask

I don't know if that was your problem too, but I was running with python, and then luigi ignored the retcodes on the luigi.cfg. Hope it helps.

Wileywilfong answered 5/3, 2018 at 19:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.