Consider the following tasks:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
FailTask(i, j)
for j in range(2)
for i in range(5)
]
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
The FailTask
fails if j > 0
. The YieldFailTaskInBatches
yield the FailTask
multiple times inside a for loop, while YieldAllFailTasksAtOnce
yields all tasks in an array.
If I run YieldFailTaskInBatches
, Luigi runs the tasks yielded in the first loop and, as one of them fails (i = 0, j = 1
), Luigi doesn't yield the rest. If I run YieldAllFailTasksAtOnce
, Luigi runs all the tasks as expected.
My question is: how can I tell Luigi to keep running the remaining tasks on YieldFailTasksInBatches
, even if some of the tasks failed? Is it possible at all?
The reason I"m asking is that I have around ~400k tasks to be triggered. I don't want to trigger them all at once, as that'll make Luigi spend too much time building each task's requirements (they can have between 1 and 400 requirements). My current solution is to yield them in batches, few at a time, but then if any of these fail, the task stops and the remaining aren't yielded.
It seems that this issue could solve this problem if implemented, but I'm wondering if there's some other way.