Can Luigi propagate exception or return any result?
Asked Answered
S

2

9

I am using Luigi to launch some pipeline. Let's take a simple exemple

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

Now let's say that myTask is raising an exception during execution. All that I am able to have is a log from luigi showing the exception.

Is there any way that luigi could propagate it or at least return a failure status ?

I would then be able to make my programm react in function of that state.

Thanks.

EDIT I forgot to specify that luigi's outputs are targetting a database when I am storing the result. If an exception is raised, no result are stored but the exception is not propagated out a luigi. I was wondering if luigi have an option to have this.

Shepard answered 15/9, 2015 at 15:47 Comment(2)
What do you mean propagate it or return a failure status? Anywhere that the program fails, you can put in a try except clause, and then you could use the except clause to write to luigi output. Then you could read in the luigi output and react accordingly.Twohanded
Sorry, your comment made me realize that I need to be more specific on the way I am using luigi and the output. I am editing.Shepard
K
22

From docs:

Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).

Example:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

Luigi has a lot of events you can choose from. You can also take a look at this tests in order to learn how to listen and react to other events.

Kuhlmann answered 28/10, 2015 at 16:37 Comment(1)
Adding that newer version of luigi handle now more Event. It became easier to handle error in the process of a task.Shepard
T
-1

What you could do is write an error to file. For instance, in your task that may fail (let's call it TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

Then, in a task that depends on that error, that task would require TaskA. And you could do something like this:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff
Twohanded answered 29/9, 2015 at 15:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.