How to ignore failures on Luigi tasks triggered inside another task's run()
Asked Answered
A

1

3

Consider the following tasks:

import luigi


class YieldFailTaskInBatches(luigi.Task):
    def run(self):
        for i in range(5):
            yield [
                FailTask(i, j)
                for j in range(2)
            ]


class YieldAllFailTasksAtOnce(luigi.Task):
    def run(self):
        yield [
            FailTask(i, j)
            for j in range(2)
            for i in range(5)
        ]

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

The FailTask fails if j > 0. The YieldFailTaskInBatches yield the FailTask multiple times inside a for loop, while YieldAllFailTasksAtOnce yields all tasks in an array.

If I run YieldFailTaskInBatches, Luigi runs the tasks yielded in the first loop and, as one of them fails (i = 0, j = 1), Luigi doesn't yield the rest. If I run YieldAllFailTasksAtOnce, Luigi runs all the tasks as expected.

My question is: how can I tell Luigi to keep running the remaining tasks on YieldFailTasksInBatches, even if some of the tasks failed? Is it possible at all?

The reason I"m asking is that I have around ~400k tasks to be triggered. I don't want to trigger them all at once, as that'll make Luigi spend too much time building each task's requirements (they can have between 1 and 400 requirements). My current solution is to yield them in batches, few at a time, but then if any of these fail, the task stops and the remaining aren't yielded.

It seems that this issue could solve this problem if implemented, but I'm wondering if there's some other way.

Antione answered 28/11, 2018 at 15:51 Comment(1)
It's related to #48150906, but their workaround didn't work for me.Antione
S
1

This is very hackish, but it should do what you want:

class YieldAll(luigi.Task):
    def run(self):
        errors = list()
        for i in range(5):
            for j in range(2):
                try:
                    FailTask(i, j).run()
                except Exception as e:
                    errors.append(e)

        if errors:
            raise ValueError(f' all traceback: {errors}')

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

so basically you are running task outside of the luigi context. unless you output a target, luigi will never know if the task has run or not.

the only task luigi is aware is YieldAll. If any of the YieldAll creates an error, the code will catch it and set the YieldAll task with a fail status.

Salon answered 5/12, 2018 at 16:57 Comment(4)
Wouldn't yielding the tasks in requires cause they all to be scheduled before any of them is executed? This is what I'm trying to avoid, as I have 400k+ tasks.Antione
@VitorBaptista this is very hackish but it is possible, adjusted the answerSalon
Thanks! I agree it's super hacky, and I probably won't use this in production, but your help was super useful. I guess this is the only way to do what I want, unless github.com/spotify/luigi/issues/2516 is implemented...Antione
Do you have an idea what I can do if FailTask also has a requires method? If I use yield FailTask in line 7 then the exception propagates, and using FailTask.run() does not perform the requires part.Elisha

© 2022 - 2024 — McMap. All rights reserved.